diff --git a/core/src/main/java/com/alibaba/nacos/core/auth/RemoteRequestAuthFilter.java b/core/src/main/java/com/alibaba/nacos/core/auth/RemoteRequestAuthFilter.java index f5fbd849e95..e72b256c3d9 100644 --- a/core/src/main/java/com/alibaba/nacos/core/auth/RemoteRequestAuthFilter.java +++ b/core/src/main/java/com/alibaba/nacos/core/auth/RemoteRequestAuthFilter.java @@ -28,7 +28,7 @@ import com.alibaba.nacos.auth.parser.ResourceParser; import com.alibaba.nacos.common.utils.ExceptionUtil; import com.alibaba.nacos.core.code.ControllerMethodsCache; -import com.alibaba.nacos.core.remote.RequestFilter; +import com.alibaba.nacos.core.remote.AbstractRequestFilter; import com.alibaba.nacos.core.utils.Loggers; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -46,7 +46,7 @@ * @version $Id: RemoteRequestAuthFilter.java, v 0.1 2020年09月14日 12:38 PM liuzunfei Exp $ */ @Component -public class RemoteRequestAuthFilter extends RequestFilter { +public class RemoteRequestAuthFilter extends AbstractRequestFilter { @Autowired private AuthConfigs authConfigs; diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RequestFilter.java b/core/src/main/java/com/alibaba/nacos/core/remote/AbstractRequestFilter.java similarity index 87% rename from core/src/main/java/com/alibaba/nacos/core/remote/RequestFilter.java rename to core/src/main/java/com/alibaba/nacos/core/remote/AbstractRequestFilter.java index b4aefe2d5f7..26e97b7c3f1 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RequestFilter.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/AbstractRequestFilter.java @@ -29,12 +29,12 @@ * @author liuzunfei * @version $Id: RequestFilter.java, v 0.1 2020年09月14日 11:46 AM liuzunfei Exp $ */ -public abstract class RequestFilter { +public abstract class AbstractRequestFilter { @Autowired private RequestFilters requestFilters; - public RequestFilter() { + public AbstractRequestFilter() { } @PostConstruct @@ -45,8 +45,9 @@ public void init() { /** * filter request. * - * @param request request. - * @param meta request meta. + * @param request request + * @param meta request meta + * @param handlerClazz handler class * @return response */ protected abstract Response filter(Request request, RequestMeta meta, Class handlerClazz); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RequestFilters.java b/core/src/main/java/com/alibaba/nacos/core/remote/RequestFilters.java index 28c0a749364..03fd52fbd80 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RequestFilters.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RequestFilters.java @@ -30,9 +30,9 @@ @Service public class RequestFilters { - List filters = new ArrayList(); + List filters = new ArrayList(); - public void registerFilter(RequestFilter requestFilter) { + public void registerFilter(AbstractRequestFilter requestFilter) { filters.add(requestFilter); } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandler.java b/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandler.java index bea10151696..aca2115a7cf 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandler.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/RequestHandler.java @@ -53,7 +53,7 @@ public void init() { * @throws NacosException nacos exception when handle request has problem. */ public Response handleRequest(T request, RequestMeta meta) throws NacosException { - for (RequestFilter filter : requestFilters.filters) { + for (AbstractRequestFilter filter : requestFilters.filters) { Response filterResult = filter.filter(request, meta, this.getClass()); if (filterResult != null && !filterResult.isSuccess()) { return filterResult; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java index 53a3a0e8812..e62f4175aa9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/controllers/InstanceController.java @@ -19,8 +19,6 @@ import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.CommonParams; -import com.alibaba.nacos.api.naming.NamingResponseCode; -import com.alibaba.nacos.api.naming.PreservedMetadataKeys; import com.alibaba.nacos.api.naming.utils.NamingUtils; import com.alibaba.nacos.auth.annotation.Secured; import com.alibaba.nacos.auth.common.ActionTypes; @@ -73,6 +71,7 @@ */ @RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") +@SuppressWarnings("PMD.RemoveCommentedCodeRule") public class InstanceController { @Autowired @@ -365,48 +364,52 @@ public ObjectNode beat(HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); - Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); - if (instance == null) { - if (clientBeat == null) { - result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); - return result; - } - - Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " - + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); - - instance = new Instance(); - instance.setPort(clientBeat.getPort()); - instance.setIp(clientBeat.getIp()); - instance.setWeight(clientBeat.getWeight()); - instance.setMetadata(clientBeat.getMetadata()); - instance.setClusterName(clusterName); - instance.setServiceName(serviceName); - instance.setInstanceId(instance.getInstanceId()); - instance.setEphemeral(clientBeat.isEphemeral()); - - serviceManager.registerInstance(namespaceId, serviceName, instance); - } - - Service service = serviceManager.getService(namespaceId, serviceName); - - if (service == null) { - throw new NacosException(NacosException.SERVER_ERROR, - "service not found: " + serviceName + "@" + namespaceId); - } - if (clientBeat == null) { - clientBeat = new RsInfo(); - clientBeat.setIp(ip); - clientBeat.setPort(port); - clientBeat.setCluster(clusterName); - } - service.processClientBeat(clientBeat); - - result.put(CommonParams.CODE, NamingResponseCode.OK); - if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { - result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); - } + // Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); + // + // if (instance == null) { + // if (clientBeat == null) { + // result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); + // return result; + // } + // + // Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + // + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); + // + // instance = new Instance(); + // instance.setPort(clientBeat.getPort()); + // instance.setIp(clientBeat.getIp()); + // instance.setWeight(clientBeat.getWeight()); + // instance.setMetadata(clientBeat.getMetadata()); + // instance.setClusterName(clusterName); + // instance.setServiceName(serviceName); + // instance.setInstanceId(instance.getInstanceId()); + // instance.setEphemeral(clientBeat.isEphemeral()); + // + // serviceManager.registerInstance(namespaceId, serviceName, instance); + // } + // + // Service service = serviceManager.getService(namespaceId, serviceName); + // + // if (service == null) { + // throw new NacosException(NacosException.SERVER_ERROR, + // "service not found: " + serviceName + "@" + namespaceId); + // } + // if (clientBeat == null) { + // clientBeat = new RsInfo(); + // clientBeat.setIp(ip); + // clientBeat.setPort(port); + // clientBeat.setCluster(clusterName); + // } + // service.processClientBeat(clientBeat); + // result.put(CommonParams.CODE, NamingResponseCode.OK); + // if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { + // result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); + // } + int resultCode = instanceService.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat); + result.put(CommonParams.CODE, resultCode); + result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, + instanceService.getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName)); result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceService.java b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceService.java index 2d5fa8bcfcb..be48a096fa0 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceService.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/InstanceService.java @@ -16,14 +16,20 @@ package com.alibaba.nacos.naming.core; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingResponseCode; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.naming.core.v2.ServiceManager; import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; import com.alibaba.nacos.naming.core.v2.client.manager.impl.IpPortBasedClientManager; import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.core.v2.service.ClientOperationService; +import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessorV2; +import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor; +import com.alibaba.nacos.naming.healthcheck.RsInfo; import com.alibaba.nacos.naming.misc.Loggers; import com.alibaba.nacos.naming.pojo.Subscriber; import com.alibaba.nacos.naming.utils.ServiceUtil; @@ -109,6 +115,63 @@ public ServiceInfo listInstance(String namespaceId, String serviceName, Subscrib return ServiceUtil.filterInstances(serviceInfo, cluster, healthOnly); } + /** + * Handle beat request. + * + * @param namespaceId namespace + * @param serviceName service name + * @param ip ip of instance + * @param port port of instance + * @param cluster cluster of instance + * @param clientBeat client beat info + * @return result code + * @throws NacosException nacos exception when service non-exist and client beat info is null + */ + public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster, + RsInfo clientBeat) throws NacosException { + String groupName = NamingUtils.getGroupName(serviceName); + String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName); + String clientId = ip + ":" + port; + IpPortBasedClient client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId); + if (null == client) { + if (null == clientBeat) { + return NamingResponseCode.RESOURCE_NOT_FOUND; + } + Instance instance = new Instance(); + instance.setPort(clientBeat.getPort()); + instance.setIp(clientBeat.getIp()); + instance.setWeight(clientBeat.getWeight()); + instance.setMetadata(clientBeat.getMetadata()); + instance.setClusterName(clientBeat.getCluster()); + instance.setServiceName(serviceName); + instance.setInstanceId(instance.getInstanceId()); + instance.setEphemeral(clientBeat.isEphemeral()); + registerInstance(namespaceId, serviceName, instance); + client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId); + } + Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped); + if (!ServiceManager.getInstance().containSingleton(service)) { + throw new NacosException(NacosException.SERVER_ERROR, + "service not found: " + serviceName + "@" + namespaceId); + } + if (null == clientBeat) { + clientBeat = new RsInfo(); + clientBeat.setIp(ip); + clientBeat.setPort(port); + clientBeat.setCluster(cluster); + clientBeat.setServiceName(serviceName); + } + ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client); + HealthCheckReactor.scheduleNow(beatProcessor); + client.setLastUpdatedTime(); + return NamingResponseCode.OK; + } + + public long getHeartBeatInterval(String namespaceId, String serviceName, String ip, int port, String cluster) { + // TODO Get heart beat interval from CP metadata + return 5000L; + } + private void createIpPortClientIfAbsent(String clientId, boolean ephemeral) { if (!ipPortBasedClientManager.allClientId().contains(clientId)) { ipPortBasedClientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral)); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java index 422226869ac..e7eeb634a76 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/AbstractClient.java @@ -35,9 +35,9 @@ */ public abstract class AbstractClient implements Client { - private final ConcurrentHashMap publishers = new ConcurrentHashMap<>(16, 0.75f, 1); + protected final ConcurrentHashMap publishers = new ConcurrentHashMap<>(16, 0.75f, 1); - private final ConcurrentHashMap subscribers = new ConcurrentHashMap<>(16, 0.75f, 1); + protected final ConcurrentHashMap subscribers = new ConcurrentHashMap<>(16, 0.75f, 1); protected volatile long lastUpdatedTime; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java index 0949e3568a8..72374861165 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/impl/IpPortBasedClient.java @@ -17,6 +17,13 @@ package com.alibaba.nacos.naming.core.v2.client.impl; import com.alibaba.nacos.naming.core.v2.client.AbstractClient; +import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants; +import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTaskV2; +import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor; + +import java.util.Collection; /** * Nacos naming client based ip and port. @@ -32,9 +39,17 @@ public class IpPortBasedClient extends AbstractClient { private final boolean ephemeral; + private final ClientBeatCheckTaskV2 beatCheckTask; + public IpPortBasedClient(String clientId, boolean ephemeral) { this.ephemeral = ephemeral; this.clientId = clientId; + beatCheckTask = new ClientBeatCheckTaskV2(this); + scheduleCheckTask(); + } + + private void scheduleCheckTask() { + HealthCheckReactor.scheduleCheck(beatCheckTask); } @Override @@ -46,4 +61,18 @@ public String getClientId() { public boolean isEphemeral() { return ephemeral; } + + @Override + public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { + instancePublishInfo.getExtendDatum().put(MetadataConstants.LAST_BEAT_TIME, System.currentTimeMillis()); + return super.addServiceInstance(service, instancePublishInfo); + } + + public Collection getAllInstancePublishInfo() { + return publishers.values(); + } + + public void destroy() { + HealthCheckReactor.cancelCheck(beatCheckTask); + } } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/IpPortBasedClientManager.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/IpPortBasedClientManager.java index fd15dc819c6..fc88fe3013e 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/IpPortBasedClientManager.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/client/manager/impl/IpPortBasedClientManager.java @@ -67,6 +67,7 @@ public boolean clientDisconnected(String clientId) { return true; } NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); + client.destroy(); return true; } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/MetadataConstants.java b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/MetadataConstants.java new file mode 100644 index 00000000000..c69680476c2 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/core/v2/metadata/MetadataConstants.java @@ -0,0 +1,27 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.core.v2.metadata; + +/** + * Metadata constants. + * + * @author xiweng.yy + */ +public class MetadataConstants { + + public static final String LAST_BEAT_TIME = "lastBeatTime"; +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/BeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/BeatCheckTask.java new file mode 100644 index 00000000000..2ca8ca30e14 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/BeatCheckTask.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.healthcheck; + +/** + * Check and update statues of ephemeral instances, remove them if they have been expired. + * + * @author xiweng.yy + */ +public interface BeatCheckTask extends Runnable { + + /** + * Task key. + * + * @return task key + */ + String taskKey(); +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/BeatProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/BeatProcessor.java new file mode 100644 index 00000000000..b346bfb4d9e --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/BeatProcessor.java @@ -0,0 +1,26 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.healthcheck; + +/** + * Thread to update ephemeral instance triggered by client beat. + * + * @author xiweng.yy + */ +public interface BeatProcessor extends Runnable { + +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java index 253e664dcb5..2d82eab2bf5 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java @@ -38,11 +38,11 @@ import java.util.List; /** - * Check and update statues of ephemeral instances, remove them if they have been expired. + * Client beat check task of service for version 1.x. * * @author nkorange */ -public class ClientBeatCheckTask implements Runnable { +public class ClientBeatCheckTask implements BeatCheckTask { private Service service; @@ -68,6 +68,7 @@ public SwitchDomain getSwitchDomain() { return ApplicationUtils.getBean(SwitchDomain.class); } + @Override public String taskKey() { return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()); } diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTaskV2.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTaskV2.java new file mode 100644 index 00000000000..88781089911 --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTaskV2.java @@ -0,0 +1,137 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.healthcheck; + +import com.alibaba.nacos.api.common.Constants; +import com.alibaba.nacos.api.naming.CommonParams; +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.utils.JacksonUtils; +import com.alibaba.nacos.core.utils.ApplicationUtils; +import com.alibaba.nacos.naming.consistency.KeyBuilder; +import com.alibaba.nacos.naming.core.DistroMapper; +import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; +import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent; +import com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent; +import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent; +import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants; +import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.misc.GlobalConfig; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.SwitchDomain; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.Collection; + +/** + * Client beat check task of service for version 2.x. + * + * @author nkorange + */ +public class ClientBeatCheckTaskV2 implements BeatCheckTask { + + private final IpPortBasedClient client; + + public ClientBeatCheckTaskV2(IpPortBasedClient client) { + this.client = client; + } + + @JsonIgnore + public DistroMapper getDistroMapper() { + return ApplicationUtils.getBean(DistroMapper.class); + } + + public GlobalConfig getGlobalConfig() { + return ApplicationUtils.getBean(GlobalConfig.class); + } + + public SwitchDomain getSwitchDomain() { + return ApplicationUtils.getBean(SwitchDomain.class); + } + + @Override + public String taskKey() { + return KeyBuilder.buildServiceMetaKey(client.getClientId(), String.valueOf(client.isEphemeral())); + } + + @Override + public void run() { + // TODO add white list like v1 {@code marked} + try { + if (!getSwitchDomain().isHealthCheckEnabled()) { + return; + } + if (!getDistroMapper().responsible(client.getClientId())) { + return; + } + boolean expireInstance = getGlobalConfig().isExpireInstance(); + Collection services = client.getAllPublishedService(); + for (Service each : services) { + InstancePublishInfo instance = client.getInstancePublishInfo(each); + long lastBeatTime = (long) instance.getExtendDatum().get(MetadataConstants.LAST_BEAT_TIME); + if (instance.isHealthy() && isUnhealthy(instance, lastBeatTime)) { + changeHealthyStatus(each, instance, lastBeatTime); + } + if (expireInstance && isExpireInstance(instance, lastBeatTime)) { + deleteIp(each, instance); + } + } + } catch (Exception e) { + Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); + } + + } + + private boolean isUnhealthy(InstancePublishInfo instance, long lastBeatTime) { + long beatTimeout = getTimeout(instance, PreservedMetadataKeys.HEART_BEAT_TIMEOUT, + Constants.DEFAULT_HEART_BEAT_TIMEOUT); + return System.currentTimeMillis() - lastBeatTime > beatTimeout; + } + + private void changeHealthyStatus(Service service, InstancePublishInfo instance, long lastBeatTime) { + instance.setHealthy(false); + String cluster = instance.getExtendDatum().get(CommonParams.CLUSTER_NAME).toString(); + Loggers.EVT_LOG + .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(), + instance.getPort(), cluster, service.getName(), UtilsAndCommons.LOCALHOST_SITE, lastBeatTime); + NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service)); + NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client)); + } + + private boolean isExpireInstance(InstancePublishInfo instance, long lastBeatTime) { + long deleteTimeout = getTimeout(instance, PreservedMetadataKeys.IP_DELETE_TIMEOUT, + Constants.DEFAULT_IP_DELETE_TIMEOUT); + return System.currentTimeMillis() - lastBeatTime > deleteTimeout; + } + + private void deleteIp(Service service, InstancePublishInfo instance) { + Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.toString(), JacksonUtils.toJson(instance)); + client.removeServiceInstance(service); + NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId())); + } + + private long getTimeout(InstancePublishInfo instance, String timeoutKey, long defaultValue) { + // TODO get time out config from CP metadata + Object object = instance.getExtendDatum().get(timeoutKey); + if (null == object) { + return defaultValue; + } + return (long) object; + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java index 213c2719880..9d355565c11 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java @@ -29,11 +29,11 @@ import java.util.concurrent.TimeUnit; /** - * Thread to update ephemeral instance triggered by client beat. + * Thread to update ephemeral instance triggered by client beat for v1.x. * * @author nkorange */ -public class ClientBeatProcessor implements Runnable { +public class ClientBeatProcessor implements BeatProcessor { public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15); diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessorV2.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessorV2.java new file mode 100644 index 00000000000..913c31935cb --- /dev/null +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessorV2.java @@ -0,0 +1,72 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.naming.healthcheck; + +import com.alibaba.nacos.api.naming.utils.NamingUtils; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient; +import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent; +import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants; +import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import com.alibaba.nacos.naming.misc.Loggers; +import com.alibaba.nacos.naming.misc.UtilsAndCommons; + +/** + * Thread to update ephemeral instance triggered by client beat for v2.x. + * + * @author nkorange + */ +public class ClientBeatProcessorV2 implements BeatProcessor { + + private final String namespace; + + private final RsInfo rsInfo; + + private final IpPortBasedClient client; + + public ClientBeatProcessorV2(String namespace, RsInfo rsInfo, IpPortBasedClient ipPortBasedClient) { + this.namespace = namespace; + this.rsInfo = rsInfo; + this.client = ipPortBasedClient; + } + + @Override + public void run() { + if (Loggers.EVT_LOG.isDebugEnabled()) { + Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); + } + String ip = rsInfo.getIp(); + int port = rsInfo.getPort(); + String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName()); + String groupName = NamingUtils.getGroupName(rsInfo.getServiceName()); + Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral()); + InstancePublishInfo instance = client.getInstancePublishInfo(service); + if (instance.getIp().equals(ip) && instance.getPort() == port) { + if (Loggers.EVT_LOG.isDebugEnabled()) { + Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); + } + instance.getExtendDatum().put(MetadataConstants.LAST_BEAT_TIME, System.currentTimeMillis()); + if (!instance.isHealthy()) { + instance.setHealthy(true); + Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", + rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE); + NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service)); + } + } + } +} diff --git a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java index a095fe5951b..552fd32a752 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java @@ -50,7 +50,7 @@ public static ScheduledFuture scheduleCheck(HealthCheckTask task) { * * @param task client beat check task */ - public static void scheduleCheck(ClientBeatCheckTask task) { + public static void scheduleCheck(BeatCheckTask task) { futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS)); } @@ -59,7 +59,7 @@ public static void scheduleCheck(ClientBeatCheckTask task) { * * @param task client beat check task */ - public static void cancelCheck(ClientBeatCheckTask task) { + public static void cancelCheck(BeatCheckTask task) { ScheduledFuture scheduledFuture = futureMap.get(task.taskKey()); if (scheduledFuture == null) { return; diff --git a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/PushExecuteServiceDelegate.java b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/PushExecuteServiceDelegate.java index 0155b38f930..e4d3cccd066 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/push/v2/PushExecuteServiceDelegate.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/push/v2/PushExecuteServiceDelegate.java @@ -25,6 +25,7 @@ * * @author xiweng.yy */ +@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule") @Component public class PushExecuteServiceDelegate implements PushExecuteService { diff --git a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java index fd17cb63870..505aadcf2f9 100644 --- a/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java +++ b/naming/src/main/java/com/alibaba/nacos/naming/remote/handler/ServiceQueryRequestHandler.java @@ -17,24 +17,16 @@ package com.alibaba.nacos.naming.remote.handler; import com.alibaba.nacos.api.exception.NacosException; -import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ServiceInfo; import com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest; import com.alibaba.nacos.api.naming.remote.response.QueryServiceResponse; import com.alibaba.nacos.api.remote.request.RequestMeta; -import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.core.remote.RequestHandler; import com.alibaba.nacos.naming.core.v2.index.ServiceStorage; import com.alibaba.nacos.naming.core.v2.pojo.Service; import com.alibaba.nacos.naming.utils.ServiceUtil; import org.springframework.stereotype.Component; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - /** * Nacos query instances request handler. *