Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router plugin: interceptor with xDS router #1621

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sermant-plugins/sermant-router/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
router.plugin:
# whether compatible with the sermant-springboot-registry plugin
enabled-registry-plugin-adaptation: false
# whether to use xds route
enabled-xds-route: false
# whether to use secure protocol to invoke spring cloud downstream service with xds route, example: http or https
enabled-springcloud-xds-route-secure: false
# Whether to use request information for routing
use-request-router: false
# Use request information as tags when routing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ public class RouterConfig implements PluginConfig {
@ConfigFieldKey("enabled-registry-plugin-adaptation")
private boolean enabledRegistryPluginAdaptation;

/**
* whether to use xds route
*/
@ConfigFieldKey("enabled-xds-route")
private boolean enabledXdsRoute;

/**
* whether to use secure protocol to invoke downstream service with xds route, example: http or https
*/
@ConfigFieldKey("enabled-springcloud-xds-route-secure")
private boolean enabledSpringCloudXdsRouteSecure;

/**
* whether to use the request information for routing
*/
Expand Down Expand Up @@ -197,4 +209,20 @@ public boolean isEnabledPreviousRule() {
public void setEnabledPreviousRule(boolean enabledPreviousRule) {
this.enabledPreviousRule = enabledPreviousRule;
}

public boolean isEnabledXdsRoute() {
return enabledXdsRoute;
}

public void setEnabledXdsRoute(boolean enabledXdsRoute) {
this.enabledXdsRoute = enabledXdsRoute;
}

public boolean isEnabledSpringCloudXdsRouteSecure() {
return enabledSpringCloudXdsRouteSecure;
}

public void setEnabledSpringCloudXdsRouteSecure(boolean enabledSpringCloudXdsRouteSecure) {
this.enabledSpringCloudXdsRouteSecure = enabledSpringCloudXdsRouteSecure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public BaseRegistryPluginAdaptationDeclarer() {

@Override
public boolean isEnabled() {
return routerConfig.isEnabledRegistryPluginAdaptation();
return routerConfig.isEnabledXdsRoute() || routerConfig.isEnabledRegistryPluginAdaptation();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class OkHttp3ClientDeclarer extends BaseRegistryPluginAdaptationDeclarer
* The fully qualified name of the enhanced okhttp request
*/
private static final String[] ENHANCE_CLASSES = {
"okhttp3.RealCall"
"okhttp3.RealCall",
"okhttp3.internal.connection.RealCall"
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.router.spring.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.router.spring.interceptor.OkHttpClientInterceptorChainInterceptor;

/**
* For OKHTTP requests, modify the URL of request
*
* @author daizhenyu
* @since 2024-09-06
*/
public class OkHttpClientInterceptorChainDeclarer extends BaseRegistryPluginAdaptationDeclarer {
private static final String ENHANCE_CLASSES =
"com.squareup.okhttp.Call$ApplicationInterceptorChain";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASSES);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals("proceed"),
new OkHttpClientInterceptorChainInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@

import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.plugin.service.PluginServiceManager;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.request.RequestTag;
import io.sermant.router.common.utils.CollectionUtils;
import io.sermant.router.common.utils.ReflectUtils;
import io.sermant.router.common.utils.ThreadLocalUtils;
import io.sermant.router.common.xds.XdsRouterHandler;
import io.sermant.router.spring.service.LoadBalancerService;
import io.sermant.router.spring.utils.BaseHttpRouterUtils;
import io.sermant.router.spring.utils.SpringRouterUtils;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -36,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import javax.servlet.http.HttpServletRequest;

Expand All @@ -50,28 +57,37 @@ public class BaseLoadBalancerInterceptor extends AbstractInterceptor {

private final boolean canLoadZuul;

private final RouterConfig routerConfig;

/**
* Constructor
*/
public BaseLoadBalancerInterceptor() {
loadBalancerService = PluginServiceManager.getPluginService(LoadBalancerService.class);
canLoadZuul = canLoadZuul();
routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class);
}

@Override
public ExecuteContext before(ExecuteContext context) {
Object object = context.getObject();
if (object instanceof BaseLoadBalancer) {
List<Object> serverList = getServerList(context.getMethod().getName(), object);
if (CollectionUtils.isEmpty(serverList)) {
return context;
}
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) object;
String name = loadBalancer.getName();
RequestData requestData = getRequestData().orElse(null);
List<Object> targetInstances = loadBalancerService.getTargetInstances(name, serverList, requestData);
context.skip(Collections.unmodifiableList(targetInstances));
if (!(object instanceof BaseLoadBalancer)) {
return context;
}
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) object;
String name = loadBalancer.getName();
RequestData requestData = getRequestData().orElse(null);

if (handleXdsRouterAndUpdateServiceInstance(name, requestData, context)) {
return context;
}
List<Object> serverList = getServerList(context.getMethod().getName(), object);
if (CollectionUtils.isEmpty(serverList)) {
return context;
}

List<Object> targetInstances = loadBalancerService.getTargetInstances(name, serverList, requestData);
context.skip(Collections.unmodifiableList(targetInstances));
return context;
}

Expand Down Expand Up @@ -104,6 +120,7 @@ private Optional<RequestData> getRequestData() {
header.putAll(requestTag.getTag());
}
HttpServletRequest request = context.getRequest();
String scheme = request.getScheme();
Enumeration<?> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = (String) headerNames.nextElement();
Expand Down Expand Up @@ -131,4 +148,22 @@ private boolean canLoadZuul() {
}
return true;
}

private boolean handleXdsRouterAndUpdateServiceInstance(String serviceName, RequestData requestData,
ExecuteContext context) {
if (requestData == null || (!routerConfig.isEnabledXdsRoute())) {
return false;
}

// use xds route to find service instances
Set<ServiceInstance> serviceInstanceByXdsRoute = XdsRouterHandler.INSTANCE
.getServiceInstanceByXdsRoute(serviceName, requestData.getPath(),
BaseHttpRouterUtils.processHeaders(requestData.getTag()));
if (CollectionUtils.isEmpty(serviceInstanceByXdsRoute)) {
return false;
}
context.skip(Collections.unmodifiableList(SpringRouterUtils
.getSpringCloudServiceInstanceByXds(serviceInstanceByXdsRoute)));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,31 @@

package io.sermant.router.spring.interceptor;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.core.utils.LogUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.router.common.config.RouterConfig;
import io.sermant.router.common.request.RequestData;
import io.sermant.router.common.utils.CollectionUtils;
import io.sermant.router.common.utils.FlowContextUtils;
import io.sermant.router.common.utils.ThreadLocalUtils;
import io.sermant.router.spring.utils.RequestInterceptorUtils;
import io.sermant.router.spring.utils.BaseHttpRouterUtils;

import org.apache.http.Header;
import org.apache.http.HttpRequest;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.HttpRequestBase;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* HTTP interception only for version 4. x
Expand All @@ -40,6 +49,10 @@
* @since 2022-10-25
*/
public class HttpClient4xInterceptor extends MarkInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger();

private RouterConfig routerConfig = PluginConfigManager.getPluginConfig(RouterConfig.class);

/**
* Pre-trigger point
*
Expand All @@ -50,31 +63,34 @@ public class HttpClient4xInterceptor extends MarkInterceptor {
@Override
public ExecuteContext doBefore(ExecuteContext context) throws Exception {
LogUtils.printHttpRequestBeforePoint(context);
Object httpRequestObject = context.getArguments()[1];
if (httpRequestObject instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) httpRequestObject;
final Optional<URI> optionalUri = RequestInterceptorUtils.formatUri(httpRequest.getRequestLine().getUri());
if (!optionalUri.isPresent()) {
return context;
}
URI uri = optionalUri.get();
if (StringUtils.isBlank(FlowContextUtils.getTagName())) {
return context;
}
Header[] headers = httpRequest.getHeaders(FlowContextUtils.getTagName());
Map<String, List<String>> flowTags = new HashMap<>();
if (headers != null && headers.length > 0) {
for (Header header : headers) {
String headerValue = header.getValue();
Map<String, List<String>> stringListMap = FlowContextUtils.decodeTags(headerValue);
flowTags.putAll(stringListMap);
}
}
if (flowTags.size() > 0) {
ThreadLocalUtils.setRequestData(new RequestData(
flowTags, uri.getPath(), httpRequest.getRequestLine().getMethod()));
}
Object[] arguments = context.getArguments();

Object httpRequestObject = arguments[1];
if (!(httpRequestObject instanceof HttpRequestBase)) {
return context;
}
final HttpRequestBase httpRequest = (HttpRequestBase) httpRequestObject;

handleXdsRouterAndUpdateHttpRequest(arguments);

if (StringUtils.isBlank(FlowContextUtils.getTagName())) {
return context;
}
Header[] headers = httpRequest.getHeaders(FlowContextUtils.getTagName());
Map<String, List<String>> flowTags = new HashMap<>();
if (headers == null || headers.length == 0) {
return context;
}
for (Header header : headers) {
String headerValue = header.getValue();
Map<String, List<String>> stringListMap = FlowContextUtils.decodeTags(headerValue);
flowTags.putAll(stringListMap);
}
if (CollectionUtils.isEmpty(flowTags)) {
return context;
}
ThreadLocalUtils.setRequestData(new RequestData(
flowTags, httpRequest.getURI().getPath(), httpRequest.getRequestLine().getMethod()));
return context;
}

Expand All @@ -98,4 +114,38 @@ public ExecuteContext onThrow(ExecuteContext context) {
LogUtils.printHttpRequestOnThrowPoint(context);
return context;
}

private Map<String, String> getHeaders(HttpRequestBase httpRequest) {
Map<String, String> headerMap = new HashMap<>();
for (Header header : httpRequest.getAllHeaders()) {
headerMap.putIfAbsent(header.getName(), header.getValue());
}
return headerMap;
}

private void handleXdsRouterAndUpdateHttpRequest(Object[] arguments) {
if (!routerConfig.isEnabledXdsRoute()) {
return;
}
HttpRequestBase httpRequest = (HttpRequestBase) arguments[1];
URI uri = httpRequest.getURI();
String host = uri.getHost();
if (!BaseHttpRouterUtils.isXdsRouteRequired(host)) {
return;
}

// use xds route to find a service instance, and modify url by it
Optional<ServiceInstance> serviceInstanceOptional = BaseHttpRouterUtils
.chooseServiceInstanceByXds(host.split("\\.")[0], uri.getPath(), getHeaders(httpRequest));
if (!serviceInstanceOptional.isPresent()) {
return;
}
ServiceInstance instance = serviceInstanceOptional.get();
try {
httpRequest.setURI(new URI(BaseHttpRouterUtils.rebuildUrlByXdsServiceInstance(uri, instance)));
arguments[0] = new HttpHost(instance.getHost(), instance.getPort());
} catch (URISyntaxException e) {
LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage());
}
}
}
Loading
Loading