diff --git a/nacossync-console/pom.xml b/nacossync-console/pom.xml index 09678f6b..e7110af3 100644 --- a/nacossync-console/pom.xml +++ b/nacossync-console/pom.xml @@ -16,7 +16,7 @@ nacossync-parent com.alibaba.nacossync - 0.3.3 + 0.3.4 4.0.0 diff --git a/nacossync-distribution/pom.xml b/nacossync-distribution/pom.xml index 08e960ca..964897dc 100644 --- a/nacossync-distribution/pom.xml +++ b/nacossync-distribution/pom.xml @@ -5,7 +5,7 @@ nacossync-parent com.alibaba.nacossync - 0.3.3 + 0.3.4 4.0.0 pom diff --git a/nacossync-test/pom.xml b/nacossync-test/pom.xml index 2e9ad586..2bdd2f52 100644 --- a/nacossync-test/pom.xml +++ b/nacossync-test/pom.xml @@ -17,7 +17,7 @@ nacossync-parent com.alibaba.nacossync - 0.3.3 + 0.3.4 ../pom.xml 4.0.0 @@ -28,7 +28,6 @@ UTF-8 - @@ -40,7 +39,7 @@ com.alibaba.nacossync nacossync-worker - 0.3.3 + 0.3.4 org.springframework.boot diff --git a/nacossync-worker/pom.xml b/nacossync-worker/pom.xml index 5dbd3ddf..dff2c84d 100644 --- a/nacossync-worker/pom.xml +++ b/nacossync-worker/pom.xml @@ -16,11 +16,11 @@ nacossync-parent com.alibaba.nacossync - 0.3.3 + 0.3.4 4.0.0 nacossync-worker - 0.3.3 + 0.3.4 3.4.9 4.1.0 diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java index a1f760b8..4b8cf2f6 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java @@ -16,17 +16,6 @@ */ package com.alibaba.nacossync.cache; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; - import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.alibaba.nacossync.constant.ClusterTypeEnum; @@ -36,6 +25,16 @@ import com.alibaba.nacossync.pojo.model.ClusterDO; import com.alibaba.nacossync.pojo.model.TaskDO; import com.alibaba.nacossync.util.SkyWalkerUtil; +import org.jboss.netty.util.internal.ThreadLocalRandom; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; /** * @author NacosSync @@ -52,7 +51,7 @@ public class SkyWalkerCacheServices { public String getClusterConnectKey(String clusterId) { List allClusterConnectKey = getAllClusterConnectKey(clusterId); - Random random = new Random(); + Random random = new ThreadLocalRandom(); return allClusterConnectKey.get(random.nextInt(allClusterConnectKey.size())); } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java similarity index 75% rename from nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolder.java rename to nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java index 2e9af630..e35f371f 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java @@ -14,19 +14,21 @@ import com.alibaba.nacossync.cache.SkyWalkerCacheServices; import com.google.common.base.Joiner; +import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.util.Strings; +import org.springframework.beans.factory.annotation.Autowired; + import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; /** * @author paderlol * @date 2018-12-24 22:08 */ @Slf4j -public abstract class AbstractServerHolder implements Holder { +public abstract class AbstractServerHolderImpl implements Holder { private final Map serviceMap = new ConcurrentHashMap<>(); @Autowired @@ -34,16 +36,16 @@ public abstract class AbstractServerHolder implements Holder { @Override public T get(String clusterId, String namespace) { - final String finalNamespace = Optional.ofNullable(namespace).orElse(""); - String key = Joiner.on("_").join(clusterId,finalNamespace); - log.info("starting create cluster server,clusterId={}", clusterId); + final String finalNamespace = Optional.ofNullable(namespace).orElse(Strings.EMPTY); + String key = Joiner.on("_").join(clusterId, finalNamespace); + serviceMap.computeIfAbsent(key, clusterKey -> { try { - return createServer(clusterId, - () -> skyWalkerCacheServices.getClusterConnectKey(clusterId), - finalNamespace); + log.info("Starting create cluster server, clusterId={}", clusterId); + return createServer(clusterId, () -> skyWalkerCacheServices.getClusterConnectKey(clusterId), + finalNamespace); } catch (Exception e) { - log.error(String.format("clusterId=%s,start server failed", clusterId), e); + log.error(String.format("clusterId=%s, start server failed", clusterId), e); return null; } }); @@ -52,16 +54,14 @@ public T get(String clusterId, String namespace) { } /** - * create real cluster client instance + * Create real cluster client instance * * @param clusterId cluster id * @param serverAddressSupplier server address * @param namespace name space * @return cluster client instance */ - abstract T createServer(String clusterId, Supplier serverAddressSupplier, - String namespace) - throws Exception; - + abstract T createServer(String clusterId, Supplier serverAddressSupplier, String namespace) + throws Exception; } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ConsulServerHolder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ConsulServerHolder.java index 1b1024df..6d62076a 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ConsulServerHolder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ConsulServerHolder.java @@ -25,7 +25,7 @@ */ @Service @Slf4j -public class ConsulServerHolder extends AbstractServerHolder { +public class ConsulServerHolder extends AbstractServerHolderImpl { public static final String HTTP = "http://"; diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java index a8da6697..e795419a 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java @@ -29,7 +29,7 @@ */ @Service @Slf4j -public class EurekaServerHolder extends AbstractServerHolder { +public class EurekaServerHolder extends AbstractServerHolderImpl { @Override EurekaNamingService createServer(String clusterId, Supplier serverAddressSupplier, String namespace) throws Exception { RestTemplateTransportClientFactory restTemplateTransportClientFactory = diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/Holder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/Holder.java index 5435f657..2b004167 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/Holder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/Holder.java @@ -13,7 +13,7 @@ package com.alibaba.nacossync.extension.holder; /** - * cluster client service + * Cluster client service * @author paderlol * @date 2018-12-24 21:59 */ diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java index e19a47e8..1b621cf1 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java @@ -27,7 +27,7 @@ */ @Service @Slf4j -public class NacosServerHolder extends AbstractServerHolder { +public class NacosServerHolder extends AbstractServerHolderImpl { @Override NamingService createServer(String clusterId, Supplier serverAddressSupplier, String namespace) throws Exception { diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ZookeeperServerHolder.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ZookeeperServerHolder.java index 2069e227..3ca2451e 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ZookeeperServerHolder.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/ZookeeperServerHolder.java @@ -28,7 +28,7 @@ */ @Service @Slf4j -public class ZookeeperServerHolder extends AbstractServerHolder { +public class ZookeeperServerHolder extends AbstractServerHolderImpl { @Override diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java index 041e8113..366d090a 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java @@ -74,6 +74,7 @@ public boolean delete(TaskDO taskDO) { List allInstances = destNamingService.getAllInstances(taskDO.getServiceName()); for (Instance instance : allInstances) { if (needDelete(instance.getMetadata(), taskDO)) { + destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort()); } } @@ -94,26 +95,26 @@ public boolean sync(TaskDO taskDO) { Response> response = consulClient.getHealthServices(taskDO.getServiceName(), true, QueryParams.DEFAULT); List healthServiceList = response.getValue(); - Set instanceKeySet = new HashSet<>(); + Set instanceKeys = new HashSet<>(); for (HealthService healthService : healthServiceList) { if (needSync(ConsulUtils.transferMetadata(healthService.getService().getTags()))) { - destNamingService.registerInstance(taskDO.getServiceName(), buildSyncInstance(healthService, taskDO)); - instanceKeySet.add(composeInstanceKey(healthService.getService().getAddress(), + instanceKeys.add(composeInstanceKey(healthService.getService().getAddress(), healthService.getService().getPort())); } } List allInstances = destNamingService.getAllInstances(taskDO.getServiceName()); for (Instance instance : allInstances) { if (needDelete(instance.getMetadata(), taskDO) - && !instanceKeySet.contains(composeInstanceKey(instance.getIp(), instance.getPort()))) { + && !instanceKeys.contains(composeInstanceKey(instance.getIp(), instance.getPort()))) { + destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort()); } } specialSyncEventBus.subscribe(taskDO, this::sync); } catch (Exception e) { - log.error("sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e); + log.error("Sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e); metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); return false; } diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java index 72bd70b2..3f7c053b 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/EurekaSyncToNacosServiceImpl.java @@ -12,6 +12,7 @@ */ package com.alibaba.nacossync.extension.impl; +import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacossync.cache.SkyWalkerCacheServices; @@ -29,10 +30,12 @@ import com.netflix.appinfo.InstanceInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.CollectionUtils; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * eureka @@ -44,8 +47,7 @@ @NacosSyncService(sourceCluster = ClusterTypeEnum.EUREKA, destinationCluster = ClusterTypeEnum.NACOS) public class EurekaSyncToNacosServiceImpl implements SyncService { - @Autowired - private MetricsManager metricsManager; + private final MetricsManager metricsManager; private final EurekaServerHolder eurekaServerHolder; private final SkyWalkerCacheServices skyWalkerCacheServices; @@ -55,11 +57,14 @@ public class EurekaSyncToNacosServiceImpl implements SyncService { private final SpecialSyncEventBus specialSyncEventBus; @Autowired - public EurekaSyncToNacosServiceImpl(EurekaServerHolder eurekaServerHolder, SkyWalkerCacheServices skyWalkerCacheServices, NacosServerHolder nacosServerHolder, SpecialSyncEventBus specialSyncEventBus) { + public EurekaSyncToNacosServiceImpl(EurekaServerHolder eurekaServerHolder, + SkyWalkerCacheServices skyWalkerCacheServices, NacosServerHolder nacosServerHolder, + SpecialSyncEventBus specialSyncEventBus, MetricsManager metricsManager) { this.eurekaServerHolder = eurekaServerHolder; this.skyWalkerCacheServices = skyWalkerCacheServices; this.nacosServerHolder = nacosServerHolder; this.specialSyncEventBus = specialSyncEventBus; + this.metricsManager = metricsManager; } @Override @@ -69,14 +74,10 @@ public boolean delete(TaskDO taskDO) { specialSyncEventBus.unsubscribe(taskDO); NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace()); List allInstances = destNamingService.getAllInstances(taskDO.getServiceName()); - for (Instance instance : allInstances) { - if (needDelete(instance.getMetadata(), taskDO)) { - destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort()); - } - } + deleteAllInstance(taskDO, destNamingService, allInstances); } catch (Exception e) { - log.error("delete task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e); + log.error("delete a task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e); metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR); return false; } @@ -88,15 +89,19 @@ public boolean sync(TaskDO taskDO) { try { EurekaNamingService eurekaNamingService = eurekaServerHolder.get(taskDO.getSourceClusterId(), null); NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null); - List instanceInfos = - eurekaNamingService.getApplications(taskDO.getServiceName()); - if (instanceInfos != null) { + List instanceInfos = eurekaNamingService.getApplications(taskDO.getServiceName()); + List allInstances = destNamingService.getAllInstances(taskDO.getServiceName()); + + if (Objects.nonNull(instanceInfos)) { for (InstanceInfo instanceInfo : instanceInfos) { if (needSync(instanceInfo.getMetadata())) { - if (InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus())) { + if (CollectionUtils.isEmpty(allInstances) + || isExistInNacosInstance(allInstances, instanceInfo)) { destNamingService.registerInstance(taskDO.getServiceName(), buildSyncInstance(instanceInfo, taskDO)); } else { + log.info("Remove invalid service instance from Nacos, serviceName={}, Ip={}, port={}", + instanceInfo.getAppName(), instanceInfo.getIPAddr(), instanceInfo.getPort()); destNamingService.deregisterInstance(instanceInfo.getAppName(), instanceInfo.getIPAddr(), instanceInfo.getPort()); } @@ -104,8 +109,7 @@ public boolean sync(TaskDO taskDO) { } } else { - metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR); - throw new RuntimeException("trying to connect to the server failed"); + deleteAllInstance(taskDO, destNamingService, allInstances); } specialSyncEventBus.subscribe(taskDO, this::sync); } catch (Exception e) { @@ -116,6 +120,21 @@ public boolean sync(TaskDO taskDO) { return true; } + private boolean isExistInNacosInstance(List allInstances, InstanceInfo instanceInfo) { + return allInstances.stream().anyMatch(instance -> instance.getIp().equals(instanceInfo.getIPAddr()) + && instance.getPort() == instanceInfo.getPort()); + } + + private void deleteAllInstance(TaskDO taskDO, NamingService destNamingService, List allInstances) + throws NacosException { + for (Instance instance : allInstances) { + if (needDelete(instance.getMetadata(), taskDO)) { + destNamingService.deregisterInstance(taskDO.getServiceName(), instance); + } + + } + } + private Instance buildSyncInstance(InstanceInfo instance, TaskDO taskDO) { Instance temp = new Instance(); temp.setIp(instance.getIPAddr()); @@ -123,8 +142,7 @@ private Instance buildSyncInstance(InstanceInfo instance, TaskDO taskDO) { temp.setServiceName(instance.getAppName()); temp.setHealthy(true); - Map metaData = new HashMap<>(); - metaData.putAll(instance.getMetadata()); + Map metaData = new HashMap<>(instance.getMetadata()); metaData.put(SkyWalkerConstants.DEST_CLUSTERID_KEY, taskDO.getDestClusterId()); metaData.put(SkyWalkerConstants.SYNC_SOURCE_KEY, skyWalkerCacheServices.getClusterType(taskDO.getSourceClusterId()).getCode()); diff --git a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java index 2e5d3461..64184b73 100644 --- a/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java +++ b/nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/NacosSyncToConsulServiceImpl.java @@ -86,7 +86,7 @@ public boolean delete(TaskDO taskDO) { } } } catch (Exception e) { - log.error("delete task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e); + log.error("delete a task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e); metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR); return false; } diff --git a/pom.xml b/pom.xml index 005306b8..4a49abd1 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ com.alibaba.nacossync nacossync-parent - 0.3.3 + 0.3.4 nacossync-console nacossync-worker @@ -37,7 +37,7 @@ git@github.com:nacos-group/nacos-sync.git scm:git@github.com:nacos-group/nacos-sync.git scm:git@github.com:nacos-group/nacos-sync.git - nacossync-0.3.3 + nacossync-0.3.4 @@ -73,7 +73,7 @@ com.alibaba.nacossync nacossync-worker - 0.3.3 + 0.3.4