Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor distro filter to support route by ip port #3881

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.alibaba.nacos.naming.core.v2.ServiceManager;
import com.alibaba.nacos.naming.core.v2.client.Client;
import com.alibaba.nacos.naming.core.v2.client.ClientSyncData;
import com.alibaba.nacos.naming.core.v2.client.ClientSyncDatumSnapshot;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent;
Expand Down Expand Up @@ -153,9 +154,9 @@ public boolean processVerifyData(DistroData distroData) {

@Override
public boolean processSnapshot(DistroData distroData) {
List<ClientSyncData> snapshot = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), List.class);
for (ClientSyncData each : snapshot) {
ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
for (ClientSyncData each : snapshot.getClientSyncDataList()) {
handlerClientSyncData(each);
}
return true;
Expand All @@ -173,14 +174,16 @@ public DistroData getDistroData(DistroKey distroKey) {

@Override
public DistroData getDatumSnapshot() {
List<ClientSyncData> snapshot = new LinkedList<>();
List<ClientSyncData> datum = new LinkedList<>();
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client) {
continue;
}
snapshot.add(client.generateSyncData());
datum.add(client.generateSyncData());
}
ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
snapshot.setClientSyncDataList(datum);
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public boolean responsible(Cluster cluster, Instance instance) {
}

/**
* Judge whether current server is responsible for input service.
* Judge whether current server is responsible for input tag.
*
* @param serviceName service name
* @param responsibleTag responsible tag, serviceName for v1 and ip:port for v2
* @return true if input service is response, otherwise false
*/
public boolean responsible(String serviceName) {
public boolean responsible(String responsibleTag) {
final List<String> servers = healthyList;

if (!switchDomain.isDistroEnabled() || ApplicationUtils.getStandaloneMode()) {
Expand All @@ -98,25 +98,25 @@ public boolean responsible(String serviceName) {
return true;
}

int target = distroHash(serviceName) % servers.size();
int target = distroHash(responsibleTag) % servers.size();
return target >= index && target <= lastIndex;
}

/**
* Calculate which other server response input service.
* Calculate which other server response input tag.
*
* @param serviceName service name
* @param responsibleTag responsible tag, serviceName for v1 and ip:port for v2
* @return server which response input service
*/
public String mapSrv(String serviceName) {
public String mapSrv(String responsibleTag) {
final List<String> servers = healthyList;

if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) {
return ApplicationUtils.getLocalAddress();
}

try {
int index = distroHash(serviceName) % servers.size();
int index = distroHash(responsibleTag) % servers.size();
return servers.get(index);
} catch (Throwable e) {
Loggers.SRV_LOG.warn("[NACOS-DISTRO] distro mapper failed, return localhost: " + ApplicationUtils
Expand All @@ -125,8 +125,8 @@ public String mapSrv(String serviceName) {
}
}

private int distroHash(String serviceName) {
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
private int distroHash(String responsibleTag) {
return Math.abs(responsibleTag.hashCode() % Integer.MAX_VALUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ public ServiceInfo listInstance(String namespaceId, String serviceName, Subscrib
clientOperationService.subscribeService(service, subscriber, subscriber.getAddrStr());
}
ServiceInfo serviceInfo = serviceStorage.getData(service);
return ServiceUtil.filterInstances(serviceInfo, cluster, healthOnly);
ServiceInfo result = ServiceUtil.filterInstances(serviceInfo, cluster, healthOnly);
// adapt for v1.x sdk
result.setName(NamingUtils.getGroupedName(result.getName(), result.getGroupName()));
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.core.v2.client;

import java.util.LinkedList;
import java.util.List;

/**
* Client sync datum snapshot.
*
* @author xiweng.yy
*/
public class ClientSyncDatumSnapshot {

private List<ClientSyncData> clientSyncDataList = new LinkedList<>();

public List<ClientSyncData> getClientSyncDataList() {
return clientSyncDataList;
}

public void setClientSyncDataList(List<ClientSyncData> clientSyncDataList) {
this.clientSyncDataList = clientSyncDataList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public interface ClientManager {
* verify client.
*
* @param clientId client id
* @return true if client is valid, otherwise is false.
*/
void verifyClient(String clientId);
boolean verifyClient(String clientId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public boolean isResponsibleClient(Client client) {
}

@Override
public void verifyClient(String clientId) {
getClientManagerById(clientId).verifyClient(clientId);
public boolean verifyClient(String clientId) {
return getClientManagerById(clientId).verifyClient(clientId);
}

private ClientManager getClientManagerById(String clientId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ public boolean isResponsibleClient(Client client) {
}

@Override
public void verifyClient(String clientId) {
public boolean verifyClient(String clientId) {
ConnectionBasedClient client = clients.get(clientId);
if (null != client) {
client.setLastRenewTime();
} else {
// TODO get client from source
return true;
}
return false;
}

private static class ExpiredClientCleaner implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
import com.alibaba.nacos.naming.core.v2.metadata.MetadataConstants;
import com.alibaba.nacos.naming.core.v2.pojo.InstancePublishInfo;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -92,9 +94,18 @@ public boolean isResponsibleClient(Client client) {
}

@Override
public void verifyClient(String clientId) {
public boolean verifyClient(String clientId) {
IpPortBasedClient client = clients.get(clientId);
// TODO check whether client is newest by updated time
// TODO renew beat time async
long currentTime = System.currentTimeMillis();
if (null != client) {
for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
each.getExtendDatum().put(MetadataConstants.LAST_BEAT_TIME, currentTime);
}
client.setLastUpdatedTime();
return true;
}
return false;
}

private static class ExpiredClientCleaner implements Runnable {
Expand Down
43 changes: 12 additions & 31 deletions naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@

package com.alibaba.nacos.naming.web;

import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.core.code.ControllerMethodsCache;
import com.alibaba.nacos.core.utils.OverrideParameterRequestWrapper;
import com.alibaba.nacos.core.utils.ReuseHttpRequest;
import com.alibaba.nacos.core.utils.ReuseHttpServletRequest;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.DistroMapper;
Expand Down Expand Up @@ -69,15 +66,17 @@ public class DistroFilter implements Filter {
@Autowired
private ControllerMethodsCache controllerMethodsCache;

@Autowired
private DistroTagGenerator distroTagGenerator;

@Override
public void init(FilterConfig filterConfig) throws ServletException {

}

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
ReuseHttpRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);
ReuseHttpServletRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);
HttpServletResponse resp = (HttpServletResponse) servletResponse;

String urlString = req.getRequestURI();
Expand All @@ -87,35 +86,17 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
}

try {
String path = new URI(req.getRequestURI()).getPath();
String serviceName = req.getParameter(CommonParams.SERVICE_NAME);
// For client under 0.8.0:
if (StringUtils.isBlank(serviceName)) {
serviceName = req.getParameter("dom");
}

if (StringUtils.isNotBlank(serviceName)) {
serviceName = serviceName.trim();
}
Method method = controllerMethodsCache.getMethod(req);

String path = new URI(req.getRequestURI()).getPath();
if (method == null) {
throw new NoSuchMethodException(req.getMethod() + " " + path);
}

String groupName = req.getParameter(CommonParams.GROUP_NAME);
if (StringUtils.isBlank(groupName)) {
groupName = Constants.DEFAULT_GROUP;
}

// use groupName@@serviceName as new service name:
String groupedServiceName = serviceName;
if (StringUtils.isNotBlank(serviceName) && !serviceName.contains(Constants.SERVICE_INFO_SPLITER)) {
groupedServiceName = groupName + Constants.SERVICE_INFO_SPLITER + serviceName;
}
String distroTag = distroTagGenerator.getResponsibleTag(req);

// proxy request to other server if necessary:
if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(groupedServiceName)) {
if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(distroTag)) {

String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);

Expand All @@ -127,7 +108,7 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
return;
}

final String targetServer = distroMapper.mapSrv(groupedServiceName);
final String targetServer = distroMapper.mapSrv(distroTag);

List<String> headerList = new ArrayList<>(16);
Enumeration<String> headers = req.getHeaderNames();
Expand All @@ -147,12 +128,12 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(groupedServiceName)
+ urlString);
Loggers.SRV_LOG
.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
}
} else {
OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(req);
requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupedServiceName);
OverrideParameterRequestWrapper requestWrapper = distroTagGenerator
.wrapperRequestWithTag(req, distroTag);
filterChain.doFilter(requestWrapper, resp);
}
} catch (AccessControlException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.web;

import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.utils.OverrideParameterRequestWrapper;
import com.alibaba.nacos.core.utils.ReuseHttpServletRequest;

/**
* Distro IP and port tag generator.
*
* @author xiweng.yy
*/
public class DistroIpPortTagGenerator implements DistroTagGenerator {

private static final String PARAMETER_IP = "ip";

private static final String PARAMETER_PORT = "port";

@Override
public String getResponsibleTag(ReuseHttpServletRequest request) {
String ip = request.getParameter(PARAMETER_IP);
String port = request.getParameter(PARAMETER_PORT);
if (StringUtils.isNotBlank(ip)) {
ip = ip.trim();
}
port = StringUtils.isBlank(port) ? "0" : port.trim();
return ip + ":" + port;
}

@Override
public OverrideParameterRequestWrapper wrapperRequestWithTag(ReuseHttpServletRequest request, String tag) {
return OverrideParameterRequestWrapper.buildRequest(request);
}
}
Loading