Skip to content

Commit

Permalink
Add heart check for instance registered by http client and openAPI (#…
Browse files Browse the repository at this point in the history
…3860)

* Add heart check

* For checkstyle and pmd
  • Loading branch information
KomachiSion authored Sep 17, 2020
1 parent 4b31f89 commit 1c935d1
Show file tree
Hide file tree
Showing 19 changed files with 453 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.alibaba.nacos.auth.parser.ResourceParser;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.code.ControllerMethodsCache;
import com.alibaba.nacos.core.remote.RequestFilter;
import com.alibaba.nacos.core.remote.AbstractRequestFilter;
import com.alibaba.nacos.core.utils.Loggers;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -46,7 +46,7 @@
* @version $Id: RemoteRequestAuthFilter.java, v 0.1 2020年09月14日 12:38 PM liuzunfei Exp $
*/
@Component
public class RemoteRequestAuthFilter extends RequestFilter {
public class RemoteRequestAuthFilter extends AbstractRequestFilter {

@Autowired
private AuthConfigs authConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
* @author liuzunfei
* @version $Id: RequestFilter.java, v 0.1 2020年09月14日 11:46 AM liuzunfei Exp $
*/
public abstract class RequestFilter {
public abstract class AbstractRequestFilter {

@Autowired
private RequestFilters requestFilters;

public RequestFilter() {
public AbstractRequestFilter() {
}

@PostConstruct
Expand All @@ -45,8 +45,9 @@ public void init() {
/**
* filter request.
*
* @param request request.
* @param meta request meta.
* @param request request
* @param meta request meta
* @param handlerClazz handler class
* @return response
*/
protected abstract Response filter(Request request, RequestMeta meta, Class handlerClazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
@Service
public class RequestFilters {

List<RequestFilter> filters = new ArrayList<RequestFilter>();
List<AbstractRequestFilter> filters = new ArrayList<AbstractRequestFilter>();

public void registerFilter(RequestFilter requestFilter) {
public void registerFilter(AbstractRequestFilter requestFilter) {
filters.add(requestFilter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void init() {
* @throws NacosException nacos exception when handle request has problem.
*/
public Response handleRequest(T request, RequestMeta meta) throws NacosException {
for (RequestFilter filter : requestFilters.filters) {
for (AbstractRequestFilter filter : requestFilters.filters) {
Response filterResult = filter.filter(request, meta, this.getClass());
if (filterResult != null && !filterResult.isSuccess()) {
return filterResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.api.naming.NamingResponseCode;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.auth.annotation.Secured;
import com.alibaba.nacos.auth.common.ActionTypes;
Expand Down Expand Up @@ -73,6 +71,7 @@
*/
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
@SuppressWarnings("PMD.RemoveCommentedCodeRule")
public class InstanceController {

@Autowired
Expand Down Expand Up @@ -365,48 +364,52 @@ public ObjectNode beat(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}

Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);

instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());

serviceManager.registerInstance(namespaceId, serviceName, instance);
}

Service service = serviceManager.getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
service.processClientBeat(clientBeat);

result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
// Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
//
// if (instance == null) {
// if (clientBeat == null) {
// result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
// return result;
// }
//
// Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
// + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
//
// instance = new Instance();
// instance.setPort(clientBeat.getPort());
// instance.setIp(clientBeat.getIp());
// instance.setWeight(clientBeat.getWeight());
// instance.setMetadata(clientBeat.getMetadata());
// instance.setClusterName(clusterName);
// instance.setServiceName(serviceName);
// instance.setInstanceId(instance.getInstanceId());
// instance.setEphemeral(clientBeat.isEphemeral());
//
// serviceManager.registerInstance(namespaceId, serviceName, instance);
// }
//
// Service service = serviceManager.getService(namespaceId, serviceName);
//
// if (service == null) {
// throw new NacosException(NacosException.SERVER_ERROR,
// "service not found: " + serviceName + "@" + namespaceId);
// }
// if (clientBeat == null) {
// clientBeat = new RsInfo();
// clientBeat.setIp(ip);
// clientBeat.setPort(port);
// clientBeat.setCluster(clusterName);
// }
// service.processClientBeat(clientBeat);
// result.put(CommonParams.CODE, NamingResponseCode.OK);
// if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
// result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
// }
int resultCode = instanceService.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat);
result.put(CommonParams.CODE, resultCode);
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
instanceService.getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@

package com.alibaba.nacos.naming.core;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingResponseCode;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.IpPortBasedClientManager;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.service.ClientOperationService;
import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessorV2;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.ServiceUtil;
Expand Down Expand Up @@ -109,6 +115,63 @@ public ServiceInfo listInstance(String namespaceId, String serviceName, Subscrib
return ServiceUtil.filterInstances(serviceInfo, cluster, healthOnly);
}

/**
* Handle beat request.
*
* @param namespaceId namespace
* @param serviceName service name
* @param ip ip of instance
* @param port port of instance
* @param cluster cluster of instance
* @param clientBeat client beat info
* @return result code
* @throws NacosException nacos exception when service non-exist and client beat info is null
*/
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
RsInfo clientBeat) throws NacosException {
String groupName = NamingUtils.getGroupName(serviceName);
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
String clientId = ip + ":" + port;
IpPortBasedClient client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId);
if (null == client) {
if (null == clientBeat) {
return NamingResponseCode.RESOURCE_NOT_FOUND;
}
Instance instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clientBeat.getCluster());
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
registerInstance(namespaceId, serviceName, instance);
client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId);
}
Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped);
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (null == clientBeat) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(cluster);
clientBeat.setServiceName(serviceName);
}
ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);
HealthCheckReactor.scheduleNow(beatProcessor);
client.setLastUpdatedTime();
return NamingResponseCode.OK;
}

public long getHeartBeatInterval(String namespaceId, String serviceName, String ip, int port, String cluster) {
// TODO Get heart beat interval from CP metadata
return 5000L;
}

private void createIpPortClientIfAbsent(String clientId, boolean ephemeral) {
if (!ipPortBasedClientManager.allClientId().contains(clientId)) {
ipPortBasedClientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
*/
public abstract class AbstractClient implements Client {

private final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);

private final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);

protected volatile long lastUpdatedTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
package com.alibaba.nacos.naming.core.v2.client.impl;

import com.alibaba.nacos.naming.core.v2.client.AbstractClient;
import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTaskV2;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;

import java.util.Collection;

/**
* Nacos naming client based ip and port.
Expand All @@ -32,9 +39,17 @@ public class IpPortBasedClient extends AbstractClient {

private final boolean ephemeral;

private final ClientBeatCheckTaskV2 beatCheckTask;

public IpPortBasedClient(String clientId, boolean ephemeral) {
this.ephemeral = ephemeral;
this.clientId = clientId;
beatCheckTask = new ClientBeatCheckTaskV2(this);
scheduleCheckTask();
}

private void scheduleCheckTask() {
HealthCheckReactor.scheduleCheck(beatCheckTask);
}

@Override
Expand All @@ -46,4 +61,18 @@ public String getClientId() {
public boolean isEphemeral() {
return ephemeral;
}

@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
instancePublishInfo.getExtendDatum().put(MetadataConstants.LAST_BEAT_TIME, System.currentTimeMillis());
return super.addServiceInstance(service, instancePublishInfo);
}

public Collection<InstancePublishInfo> getAllInstancePublishInfo() {
return publishers.values();
}

public void destroy() {
HealthCheckReactor.cancelCheck(beatCheckTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public boolean clientDisconnected(String clientId) {
return true;
}
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
client.destroy();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.core.v2.metadata;

/**
* Metadata constants.
*
* @author xiweng.yy
*/
public class MetadataConstants {

public static final String LAST_BEAT_TIME = "lastBeatTime";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.healthcheck;

/**
* Check and update statues of ephemeral instances, remove them if they have been expired.
*
* @author xiweng.yy
*/
public interface BeatCheckTask extends Runnable {

/**
* Task key.
*
* @return task key
*/
String taskKey();
}
Loading

0 comments on commit 1c935d1

Please sign in to comment.