Skip to content

Commit

Permalink
router plugin: interceptor with xDS router
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <[email protected]>
  • Loading branch information
daizhenyu committed Sep 20, 2024
1 parent ebc79d4 commit d5ee1ec
Show file tree
Hide file tree
Showing 19 changed files with 1,139 additions and 74 deletions.
4 changes: 4 additions & 0 deletions sermant-plugins/sermant-router/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
router.plugin:
# whether compatible with the sermant-springboot-registry plugin
enabled-registry-plugin-adaptation: false
# whether to use xds route
enabled-xds-route: false
# whether to use secure protocol to invoke spring cloud downstream service with xds route, example: http or https
enabled-springcloud-xds-route-secure: false
# Whether to use request information for routing
use-request-router: false
# Use request information as tags when routing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ public class RouterConfig implements PluginConfig {
@ConfigFieldKey("enabled-registry-plugin-adaptation")
private boolean enabledRegistryPluginAdaptation;

/**
* whether to use xds route
*/
@ConfigFieldKey("enabled-xds-route")
private boolean enabledXdsRoute;

/**
* whether to use secure protocol to invoke downstream service with xds route, example: http or https
*/
@ConfigFieldKey("enabled-springcloud-xds-route-secure")
private boolean enabledSpringCloudXdsRouteSecure;

/**
* whether to use the request information for routing
*/
Expand Down Expand Up @@ -197,4 +209,20 @@ public boolean isEnabledPreviousRule() {
public void setEnabledPreviousRule(boolean enabledPreviousRule) {
this.enabledPreviousRule = enabledPreviousRule;
}

public boolean isEnabledXdsRoute() {
return enabledXdsRoute;
}

public void setEnabledXdsRoute(boolean enabledXdsRoute) {
this.enabledXdsRoute = enabledXdsRoute;
}

public boolean isEnabledSpringCloudXdsRouteSecure() {
return enabledSpringCloudXdsRouteSecure;
}

public void setEnabledSpringCloudXdsRouteSecure(boolean enabledSpringCloudXdsRouteSecure) {
this.enabledSpringCloudXdsRouteSecure = enabledSpringCloudXdsRouteSecure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public BaseRegistryPluginAdaptationDeclarer() {

@Override
public boolean isEnabled() {
return routerConfig.isEnabledRegistryPluginAdaptation();
return routerConfig.isEnabledXdsRoute() || routerConfig.isEnabledRegistryPluginAdaptation();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class OkHttp3ClientDeclarer extends BaseRegistryPluginAdaptationDeclarer
* The fully qualified name of the enhanced okhttp request
*/
private static final String[] ENHANCE_CLASSES = {
"okhttp3.RealCall"
"okhttp3.RealCall",
"okhttp3.internal.connection.RealCall"
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.router.spring.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.router.spring.interceptor.OkHttpClientInterceptorChainInterceptor;

/**
* For OKHTTP requests, modify the URL of request
*
* @author daizhenyu
* @since 2024-09-06
*/
public class OkHttpClientInterceptorChainDeclarer extends BaseRegistryPluginAdaptationDeclarer {
private static final String ENHANCE_CLASSES =
"com.squareup.okhttp.Call$ApplicationInterceptorChain";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASSES);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals("proceed"),
new OkHttpClientInterceptorChainInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.plugin.service.PluginServiceManager;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.request.RequestTag;
import io.sermant.router.common.utils.CollectionUtils;
import io.sermant.router.common.utils.ReflectUtils;
import io.sermant.router.common.utils.ThreadLocalUtils;
import io.sermant.router.common.xds.XdsRouterHandler;
import io.sermant.router.spring.service.LoadBalancerService;
import io.sermant.router.spring.utils.BaseHttpRouterUtils;
import io.sermant.router.spring.utils.SpringRouterUtils;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -36,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import javax.servlet.http.HttpServletRequest;

Expand All @@ -50,28 +57,37 @@ public class BaseLoadBalancerInterceptor extends AbstractInterceptor {

private final boolean canLoadZuul;

private final RouterConfig routerConfig;

/**
* Constructor
*/
public BaseLoadBalancerInterceptor() {
loadBalancerService = PluginServiceManager.getPluginService(LoadBalancerService.class);
canLoadZuul = canLoadZuul();
routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class);
}

@Override
public ExecuteContext before(ExecuteContext context) {
Object object = context.getObject();
if (object instanceof BaseLoadBalancer) {
List<Object> serverList = getServerList(context.getMethod().getName(), object);
if (CollectionUtils.isEmpty(serverList)) {
return context;
}
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) object;
String name = loadBalancer.getName();
RequestData requestData = getRequestData().orElse(null);
List<Object> targetInstances = loadBalancerService.getTargetInstances(name, serverList, requestData);
context.skip(Collections.unmodifiableList(targetInstances));
if (!(object instanceof BaseLoadBalancer)) {
return context;
}
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) object;
String name = loadBalancer.getName();
RequestData requestData = getRequestData().orElse(null);

if (handleXdsRouterAndUpdateServiceInstance(name, requestData, context)) {
return context;
}
List<Object> serverList = getServerList(context.getMethod().getName(), object);
if (CollectionUtils.isEmpty(serverList)) {
return context;
}

List<Object> targetInstances = loadBalancerService.getTargetInstances(name, serverList, requestData);
context.skip(Collections.unmodifiableList(targetInstances));
return context;
}

Expand Down Expand Up @@ -104,6 +120,7 @@ private Optional<RequestData> getRequestData() {
header.putAll(requestTag.getTag());
}
HttpServletRequest request = context.getRequest();
String scheme = request.getScheme();
Enumeration<?> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = (String) headerNames.nextElement();
Expand Down Expand Up @@ -131,4 +148,22 @@ private boolean canLoadZuul() {
}
return true;
}

private boolean handleXdsRouterAndUpdateServiceInstance(String serviceName, RequestData requestData,
ExecuteContext context) {
if (requestData == null || (!routerConfig.isEnabledXdsRoute())) {
return false;
}

// use xds route to find service instances
Set<ServiceInstance> serviceInstanceByXdsRoute = XdsRouterHandler.INSTANCE
.getServiceInstanceByXdsRoute(serviceName, requestData.getPath(),
BaseHttpRouterUtils.processHeaders(requestData.getTag()));
if (CollectionUtils.isEmpty(serviceInstanceByXdsRoute)) {
return false;
}
context.skip(Collections.unmodifiableList(SpringRouterUtils
.getSpringCloudServiceInstanceByXds(serviceInstanceByXdsRoute)));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,31 @@

package io.sermant.router.spring.interceptor;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.core.utils.LogUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.utils.CollectionUtils;
import io.sermant.router.common.utils.FlowContextUtils;
import io.sermant.router.common.utils.ThreadLocalUtils;
import io.sermant.router.spring.utils.RequestInterceptorUtils;
import io.sermant.router.spring.utils.BaseHttpRouterUtils;

import org.apache.http.Header;
import org.apache.http.HttpRequest;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.HttpRequestBase;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* HTTP interception only for version 4. x
Expand All @@ -40,6 +49,10 @@
* @since 2022-10-25
*/
public class HttpClient4xInterceptor extends MarkInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger();

private RouterConfig routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class);

/**
* Pre-trigger point
*
Expand All @@ -50,31 +63,34 @@ public class HttpClient4xInterceptor extends MarkInterceptor {
@Override
public ExecuteContext doBefore(ExecuteContext context) throws Exception {
LogUtils.printHttpRequestBeforePoint(context);
Object httpRequestObject = context.getArguments()[1];
if (httpRequestObject instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) httpRequestObject;
final Optional<URI> optionalUri = RequestInterceptorUtils.formatUri(httpRequest.getRequestLine().getUri());
if (!optionalUri.isPresent()) {
return context;
}
URI uri = optionalUri.get();
if (StringUtils.isBlank(FlowContextUtils.getTagName())) {
return context;
}
Header[] headers = httpRequest.getHeaders(FlowContextUtils.getTagName());
Map<String, List<String>> flowTags = new HashMap<>();
if (headers != null && headers.length > 0) {
for (Header header : headers) {
String headerValue = header.getValue();
Map<String, List<String>> stringListMap = FlowContextUtils.decodeTags(headerValue);
flowTags.putAll(stringListMap);
}
}
if (flowTags.size() > 0) {
ThreadLocalUtils.setRequestData(new RequestData(
flowTags, uri.getPath(), httpRequest.getRequestLine().getMethod()));
}
Object[] arguments = context.getArguments();

Object httpRequestObject = arguments[1];
if (!(httpRequestObject instanceof HttpRequestBase)) {
return context;
}
final HttpRequestBase httpRequest = (HttpRequestBase) httpRequestObject;

handleXdsRouterAndUpdateHttpRequest(arguments);

if (StringUtils.isBlank(FlowContextUtils.getTagName())) {
return context;
}
Header[] headers = httpRequest.getHeaders(FlowContextUtils.getTagName());
Map<String, List<String>> flowTags = new HashMap<>();
if (headers == null || headers.length == 0) {
return context;
}
for (Header header : headers) {
String headerValue = header.getValue();
Map<String, List<String>> stringListMap = FlowContextUtils.decodeTags(headerValue);
flowTags.putAll(stringListMap);
}
if (CollectionUtils.isEmpty(flowTags)) {
return context;
}
ThreadLocalUtils.setRequestData(new RequestData(
flowTags, httpRequest.getURI().getPath(), httpRequest.getRequestLine().getMethod()));
return context;
}

Expand All @@ -98,4 +114,38 @@ public ExecuteContext onThrow(ExecuteContext context) {
LogUtils.printHttpRequestOnThrowPoint(context);
return context;
}

private Map<String, String> getHeaders(HttpRequestBase httpRequest) {
Map<String, String> headerMap = new HashMap<>();
for (Header header : httpRequest.getAllHeaders()) {
headerMap.putIfAbsent(header.getName(), header.getValue());
}
return headerMap;
}

private void handleXdsRouterAndUpdateHttpRequest(Object[] arguments) {
if (!routerConfig.isEnabledXdsRoute()) {
return;
}
HttpRequestBase httpRequest = (HttpRequestBase) arguments[1];
URI uri = httpRequest.getURI();
String host = uri.getHost();
if (!BaseHttpRouterUtils.isXdsRouteRequired(host)) {
return;
}

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(httpRequest));
if (!serviceInstanceOptional.isPresent()) {
return;
}
ServiceInstance instance = serviceInstanceOptional.get();
try {
httpRequest.setURI(new URI(BaseHttpRouterUtils.rebuildUrlByXdsServiceInstance(uri, instance)));
arguments[0] = new HttpHost(instance.getHost(), instance.getPort());
} catch (URISyntaxException e) {
LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage());
}
}
}
Loading

0 comments on commit d5ee1ec

Please sign in to comment.