Skip to content

Commit

Permalink
Sermant流量标签透传插件支持grpc框架,grpc框架版本为1.13+
Browse files Browse the repository at this point in the history
  • Loading branch information
daizhenyu committed Sep 6, 2023
1 parent 5814e18 commit f807a95
Show file tree
Hide file tree
Showing 16 changed files with 545 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
<powermock.version>2.0.9</powermock.version>
<dubbo.version>3.2.0</dubbo.version>
<alibaba.dubbo.version>2.6.12</alibaba.dubbo.version>
<grpc.version>1.52.1</grpc.version>
<protobuf.version>3.18.0</protobuf.version>
<sofa-rpc.version>5.4.0</sofa-rpc.version>
<servicecomb-java-chassis.version>2.6.0</servicecomb-java-chassis.version>
</properties>
Expand Down Expand Up @@ -72,6 +74,18 @@
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.ClientCallImplInterceptor;

/**
* grpc client端Declarer,拦截header参数注入流量标签,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientCallImplDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.internal.ClientCallImpl";

private static final String METHOD_NAMES = "start";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAMES),
new ClientCallImplInterceptor())};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.GrpcServerInterceptor;

/**
* grpc server端declare,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-15
**/
public class GrpcServerDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder";

private static final String METHOD_NAME = "build";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), new GrpcServerInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.enumeration;

/**
* 特殊的线程池枚举类,该类线程池不会使用新线程执行方法
*
* @author daizhenyu
* @since 2023-09-04
**/
public enum SpecialExecutor {
/**
* grpc的ThreadlessExecutor线程池。
* 对于该线程池,主线程会创建子线程提交线程任务,然后主线程执行。
* ThreadlessExecutor协调两个线程的任务,形成生产消费的模式
*/
THREAD_LESS_EXECUTOR("ThreadlessExecutor"),

/**
* grpc的SynchronizationContext线程池。
* 对于该线程池,并不会使用新的线程执行线程任务,而是在调用该线程池的线程中依次执行线程池队列的任务。
*/
SYNCHRONIZATION_CONTEXT("SynchronizationContext"),

/**
* 用于getSpecialExecutorByName方法寻找不到合适的线程池枚举对象时返回返回值
*/
OTHER_EXECUTORS("otherExecutors");

private final String executorName;

SpecialExecutor(String executorName) {
this.executorName = executorName;
}

public String getExecutorName() {
return this.executorName;
}

/**
* 用于根据线程池名称获取线程池枚举对象
*
* @param name 线程池名称
* @return SpecialExecutor对象
*/
public static SpecialExecutor getSpecialExecutorByName(String name) {
for (SpecialExecutor value : values()) {
if (value.getExecutorName().equals(name)) {
return value;
}
}
return OTHER_EXECUTORS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.huaweicloud.sermant.core.utils.tag.TrafficData;
import com.huaweicloud.sermant.core.utils.tag.TrafficTag;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage;
import com.huaweicloud.sermant.tag.transmission.wrapper.CallableWrapper;
import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableAndCallableWrapper;
import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableWrapper;
Expand Down Expand Up @@ -61,54 +62,59 @@ public ExecuteContext before(ExecuteContext context) {
if (trafficTag == null && trafficData == null) {
return context;
}

TrafficMessage trafficMessage = new TrafficMessage(trafficTag, trafficData);
Object executorObject = context.getObject();
String executorName = executorObject.getClass().getSimpleName();
Object argument = arguments[0];
if (argument instanceof RunnableAndCallableWrapper || argument instanceof RunnableWrapper
|| argument instanceof CallableWrapper) {
return context;
}
if (argument instanceof Runnable && argument instanceof Callable) {
return buildRunnableAndCallableWrapper(context, arguments, trafficTag, trafficData, argument);
return buildRunnableAndCallableWrapper(context, arguments, trafficMessage, argument, executorName);
}
if (argument instanceof Runnable) {
return buildRunnableWrapper(context, arguments, trafficTag, trafficData, argument);
return buildRunnableWrapper(context, arguments, trafficMessage, argument, executorName);
}
if (argument instanceof Callable) {
return buildCallableWrapper(context, arguments, trafficTag, trafficData, argument);
return buildCallableWrapper(context, arguments, trafficMessage, argument, executorName);
}
return context;
}

private ExecuteContext buildCallableWrapper(ExecuteContext context, Object[] arguments, TrafficTag trafficTag,
TrafficData trafficData, Object argument) {
log(argument, trafficTag, trafficData, CallableWrapper.class.getCanonicalName());
arguments[0] = new CallableWrapper<>((Callable<?>) argument, trafficTag, trafficData,
cannotTransmit);
private ExecuteContext buildCallableWrapper(ExecuteContext context, Object[] arguments,
TrafficMessage trafficMessage,
Object argument,
String executorName) {
log(argument, trafficMessage, CallableWrapper.class.getCanonicalName());
arguments[0] = new CallableWrapper<>((Callable<?>) argument, trafficMessage,
cannotTransmit, executorName);
return context;
}

private ExecuteContext buildRunnableWrapper(ExecuteContext context, Object[] arguments, TrafficTag trafficTag,
TrafficData trafficData, Object argument) {
log(argument, trafficTag, trafficData, RunnableWrapper.class.getCanonicalName());
arguments[0] = new RunnableWrapper<>((Runnable) argument, trafficTag, trafficData,
cannotTransmit);
private ExecuteContext buildRunnableWrapper(ExecuteContext context, Object[] arguments,
TrafficMessage trafficMessage,
Object argument,
String executorName) {
log(argument, trafficMessage, RunnableWrapper.class.getCanonicalName());
arguments[0] = new RunnableWrapper<>((Runnable) argument, trafficMessage,
cannotTransmit, executorName);
return context;
}

private ExecuteContext buildRunnableAndCallableWrapper(ExecuteContext context, Object[] arguments,
TrafficTag trafficTag,
TrafficData trafficData, Object argument) {
log(argument, trafficTag, trafficData, RunnableAndCallableWrapper.class.getCanonicalName());
TrafficMessage trafficMessage, Object argument, String executorName) {
log(argument, trafficMessage, RunnableAndCallableWrapper.class.getCanonicalName());
arguments[0] = new RunnableAndCallableWrapper<>((Runnable) argument, (Callable<?>) argument,
trafficTag, trafficData, cannotTransmit);
trafficMessage, cannotTransmit, executorName);
return context;
}

private void log(Object argument, TrafficTag trafficTag, TrafficData trafficData, String wrapperClassName) {
private void log(Object argument, TrafficMessage trafficMessage, String wrapperClassName) {
LOGGER.log(Level.FINE, "Class name is {0}, hash code is {1}, trafficTag is {2}, "
+ "trafficData is {3}, will be converted to {4}.",
new Object[]{argument.getClass().getName(), Integer.toHexString(argument.hashCode()),
trafficTag, trafficData, wrapperClassName});
trafficMessage.getTrafficTag(), trafficMessage.getTrafficData(), wrapperClassName});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.utils.CollectionUtils;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractClientInterceptor;

import io.grpc.Metadata;

import java.util.List;

/**
* grpc使用DynamicMessage调用服务端,拦截header参数注入流量标签
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientCallImplInterceptor extends AbstractClientInterceptor<Metadata> {
@Override
protected ExecuteContext doBefore(ExecuteContext context) {
Object[] arguments = context.getArguments();

// 被拦截方法的入参数量为2
if (arguments == null || arguments.length <= 1) {
return context;
}
Object metadataObject = arguments[1];
if (metadataObject instanceof Metadata) {
injectTrafficTag2Carrier((Metadata) metadataObject);
}
return context;
}

@Override
protected ExecuteContext doAfter(ExecuteContext context) {
return context;
}

@Override
protected void injectTrafficTag2Carrier(Metadata header) {
for (String key : tagTransmissionConfig.getTagKeys()) {
List<String> values = TrafficUtils.getTrafficTag().getTag().get(key);
if (CollectionUtils.isEmpty(values)) {
continue;
}
header.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), values.get(0));
}
}
}
Loading

0 comments on commit f807a95

Please sign in to comment.