Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support update instance metadata and sync by raft #4279

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,22 +173,11 @@ public String deregister(HttpServletRequest request) throws Exception {
@PutMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String update(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);

String agent = WebUtils.getUserAgent(request);

ClientInfo clientInfo = new ClientInfo(agent);

if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
serviceManager.updateInstance(namespaceId, serviceName, instance);
} else {
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String groupName = WebUtils.optional(request, CommonParams.GROUP_NAME, Constants.DEFAULT_GROUP);
instanceService.updateInstance(namespaceId, serviceName, groupName, parseInstance(request));
return "ok";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ public interface InstanceOperator {
*/
void removeInstance(String namespaceId, String serviceName, Instance instance) throws NacosException;

/**
* Update instance information. Due to the basic information can't be changed, so this update should only update
* metadata.
*
* @param namespaceId namespace
* @param serviceName service name
* @param groupName group name
* @param instance instance
* @throws NacosException nacos exception when update failed
*/
void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException;

/**
* Get all instance of input service.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,31 @@

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingResponseCode;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.client.manager.impl.IpPortBasedClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
import com.alibaba.nacos.naming.core.v2.metadata.InstanceMetadata;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataOperateService;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.core.v2.service.ClientOperationService;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatProcessorV2;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.healthcheck.heartbeat.ClientBeatProcessorV2;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.utils.ServiceUtil;

import java.util.Optional;

/**
* Instance service.
*
Expand All @@ -42,17 +51,27 @@
@org.springframework.stereotype.Service
public class InstanceOperatorClientImpl implements InstanceOperator {

private final IpPortBasedClientManager ipPortBasedClientManager;
private final ClientManagerDelegate clientManager;

private final ClientOperationService clientOperationService;

private final ServiceStorage serviceStorage;

public InstanceOperatorClientImpl(IpPortBasedClientManager ipPortBasedClientManager,
ClientOperationService clientOperationService, ServiceStorage serviceStorage) {
this.ipPortBasedClientManager = ipPortBasedClientManager;
private final NamingMetadataOperateService metadataOperateService;

private final NamingMetadataManager metadataManager;

private final SwitchDomain switchDomain;

public InstanceOperatorClientImpl(ClientManagerDelegate clientManager,
ClientOperationService clientOperationService, ServiceStorage serviceStorage,
NamingMetadataOperateService metadataOperateService, NamingMetadataManager metadataManager, SwitchDomain switchDomain) {
this.clientManager = clientManager;
this.clientOperationService = clientOperationService;
this.serviceStorage = serviceStorage;
this.metadataOperateService = metadataOperateService;
this.metadataManager = metadataManager;
this.switchDomain = switchDomain;
}

/**
Expand All @@ -71,7 +90,7 @@ public void registerInstance(String namespaceId, String serviceName, Instance in
@Override
public void removeInstance(String namespaceId, String serviceName, Instance instance) {
String clientId = instance.toInetAddr();
if (!ipPortBasedClientManager.allClientId().contains(clientId)) {
if (!clientManager.allClientId().contains(clientId)) {
Loggers.SRV_LOG.warn("remove instance from non-exist client: {}", clientId);
return;
}
Expand All @@ -81,6 +100,24 @@ public void removeInstance(String namespaceId, String serviceName, Instance inst
clientOperationService.deregisterInstance(service, instance, clientId);
}

@Override
public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException {
Service service = Service.newService(namespaceId, groupName, serviceName, instance.isEphemeral());
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + service);
}
metadataOperateService.updateInstanceMetadata(service, instance.getIp(), buildMetadata(instance));
}

private InstanceMetadata buildMetadata(Instance instance) {
InstanceMetadata result = new InstanceMetadata();
result.setEnabled(instance.isEnabled());
result.setWeight(instance.getWeight());
result.getExtendData().putAll(instance.getMetadata());
return result;
}

@Override
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
boolean healthOnly) {
Expand All @@ -104,7 +141,7 @@ public int handleBeat(String namespaceId, String serviceName, String ip, int por
String groupName = NamingUtils.getGroupName(serviceName);
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
String clientId = ip + ":" + port;
IpPortBasedClient client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId);
IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);
if (null == client) {
if (null == clientBeat) {
return NamingResponseCode.RESOURCE_NOT_FOUND;
Expand All @@ -119,7 +156,7 @@ public int handleBeat(String namespaceId, String serviceName, String ip, int por
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
registerInstance(namespaceId, serviceName, instance);
client = (IpPortBasedClient) ipPortBasedClientManager.getClient(clientId);
client = (IpPortBasedClient) clientManager.getClient(clientId);
}
Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped);
if (!ServiceManager.getInstance().containSingleton(service)) {
Expand All @@ -141,13 +178,24 @@ public int handleBeat(String namespaceId, String serviceName, String ip, int por

@Override
public long getHeartBeatInterval(String namespaceId, String serviceName, String ip, int port, String cluster) {
// TODO Get heart beat interval from CP metadata
return 5000L;
String groupName = NamingUtils.getGroupName(serviceName);
String serviceNameNoGrouped = NamingUtils.getServiceName(serviceName);
Service service = Service.newService(namespaceId, groupName, serviceNameNoGrouped);
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, ip);
if (metadata.isPresent() && metadata.get().getExtendData().containsKey(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
return ConvertUtils.toLong(metadata.get().getExtendData().get(PreservedMetadataKeys.HEART_BEAT_INTERVAL));
}
String clientId = ip + ":" + port;
InstancePublishInfo instance = clientManager.getClient(clientId).getInstancePublishInfo(service);
if (null != instance && instance.getExtendDatum().containsKey(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
return ConvertUtils.toLong(instance.getExtendDatum().get(PreservedMetadataKeys.HEART_BEAT_INTERVAL));
}
return switchDomain.getClientBeatInterval();
}

private void createIpPortClientIfAbsent(String clientId, boolean ephemeral) {
if (!ipPortBasedClientManager.allClientId().contains(clientId)) {
ipPortBasedClientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
if (!clientManager.allClientId().contains(clientId)) {
clientManager.clientConnected(new IpPortBasedClient(clientId, ephemeral));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.healthcheck.RsInfo;
import com.alibaba.nacos.naming.misc.Loggers;
Expand Down Expand Up @@ -99,6 +100,13 @@ public void removeInstance(String namespaceId, String serviceName, Instance inst
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), coreInstance);
}

@Override
public void updateInstance(String namespaceId, String serviceName, String groupName, Instance instance) throws NacosException {
com.alibaba.nacos.naming.core.Instance coreInstance = (com.alibaba.nacos.naming.core.Instance) instance;
String groupedServiceName = NamingUtils.getGroupedName(groupName, serviceName);
serviceManager.updateInstance(namespaceId, groupedServiceName, coreInstance);
}

@Override
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
boolean healthOnly) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManagerDelegate;
import com.alibaba.nacos.naming.core.v2.metadata.InstanceMetadata;
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.core.v2.pojo.Service;
import com.alibaba.nacos.naming.misc.SwitchDomain;
Expand Down Expand Up @@ -54,57 +57,73 @@ public class ServiceStorage {

private final SwitchDomain switchDomain;

private final NamingMetadataManager metadataManager;

private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;

private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;

private final ConcurrentMap<String, Set<Service>> namespaceServiceIndex;

public ServiceStorage(ClientServiceIndexesManager serviceIndexesManager, ClientManagerDelegate clientManager,
SwitchDomain switchDomain) {
SwitchDomain switchDomain, NamingMetadataManager metadataManager) {
this.serviceIndexesManager = serviceIndexesManager;
this.clientManager = clientManager;
this.switchDomain = switchDomain;
this.metadataManager = metadataManager;
this.serviceDataIndexes = new ConcurrentHashMap<>();
this.serviceClusterIndex = new ConcurrentHashMap<>();
this.namespaceServiceIndex = new ConcurrentHashMap<>();
}

public Set<String> getClusters(Service service) {
return serviceClusterIndex.getOrDefault(service, new HashSet<>());
}

public Collection<Service> getAllServicesOfNamespace(String namespace) {
return namespaceServiceIndex.getOrDefault(namespace, new ConcurrentHashSet<>());
}

public ServiceInfo getData(Service service) {
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
result.setHosts(getAllInstancesFromIndex(service));
serviceDataIndexes.put(service, result);
updateNamespaceIndex(service);
return result;
}

private ServiceInfo emptyServiceInfo(Service service) {
ServiceInfo result = new ServiceInfo();
result.setName(service.getName());
result.setGroupName(service.getGroup());
result.setLastRefTime(System.currentTimeMillis());
result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
List<Instance> instances = new LinkedList<>();
return result;
}

private List<Instance> getAllInstancesFromIndex(Service service) {
List<Instance> result = new LinkedList<>();
Set<String> clusters = new HashSet<>();
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
Instance instance = parseInstance(service, instancePublishInfo.get());
instances.add(instance);
result.add(instance);
clusters.add(instance.getClusterName());
}
}
result.setHosts(instances);
serviceDataIndexes.put(service, result);
// cache clusters of this service
serviceClusterIndex.put(service, clusters);
updateNamespaceIndex(service);
return result;
}

public Set<String> getClusters(Service service) {
return serviceClusterIndex.getOrDefault(service, new HashSet<>());
}

public Collection<Service> getAllServicesOfNamespace(String namespace) {
return namespaceServiceIndex.getOrDefault(namespace, new ConcurrentHashSet<>());
}

private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
Client client = clientManager.getClient(clientId);
if (null == client) {
Expand All @@ -126,6 +145,14 @@ private Instance parseInstance(Service service, InstancePublishInfo instancePubl
instanceMetadata.put(entry.getKey(), entry.getValue().toString());
}
}
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, instancePublishInfo.getIp());
if (metadata.isPresent()) {
result.setEnabled(metadata.get().isEnabled());
result.setWeight(metadata.get().getWeight());
for (Map.Entry<String, Object> entry : metadata.get().getExtendData().entrySet()) {
instanceMetadata.put(entry.getKey(), entry.getValue().toString());
}
}
result.setMetadata(instanceMetadata);
result.setEphemeral(service.isEphemeral());
result.setHealthy(instancePublishInfo.isHealthy());
Expand Down
Loading