From f807a957997f8103105541295b95dfc0dc006fcc Mon Sep 17 00:00:00 2001 From: daizhenyu <1449308021@qq.com> Date: Wed, 6 Sep 2023 16:07:14 +0800 Subject: [PATCH] =?UTF-8?q?Sermant=E6=B5=81=E9=87=8F=E6=A0=87=E7=AD=BE?= =?UTF-8?q?=E9=80=8F=E4=BC=A0=E6=8F=92=E4=BB=B6=E6=94=AF=E6=8C=81grpc?= =?UTF-8?q?=E6=A1=86=E6=9E=B6=EF=BC=8Cgrpc=E6=A1=86=E6=9E=B6=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E4=B8=BA1.13+?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tag-transmission-plugin/pom.xml | 14 ++++ .../rpc/grpc/ClientCallImplDeclarer.java | 46 +++++++++++ .../rpc/grpc/GrpcServerDeclarer.java | 47 +++++++++++ .../enumeration/SpecialExecutor.java | 68 ++++++++++++++++ .../AbstractExecutorInterceptor.java | 46 ++++++----- .../rpc/grpc/ClientCallImplInterceptor.java | 65 +++++++++++++++ .../rpc/grpc/GrpcServerInterceptor.java | 60 ++++++++++++++ .../rpc/grpc/ServerHeaderInterceptor.java | 80 +++++++++++++++++++ .../tag/transmission/pojo/TrafficMessage.java | 65 +++++++++++++++ .../wrapper/AbstractThreadWrapper.java | 31 +++++-- .../transmission/wrapper/CallableWrapper.java | 13 ++- .../wrapper/RunnableAndCallableWrapper.java | 13 ++- .../transmission/wrapper/RunnableWrapper.java | 37 +++++++-- ....core.plugin.agent.declarer.PluginDeclarer | 2 + .../crossthread/ExecutorInterceptorTest.java | 4 +- ...heduledExecutorServiceInterceptorTest.java | 4 +- 16 files changed, 545 insertions(+), 50 deletions(-) create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/ClientCallImplDeclarer.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/GrpcServerDeclarer.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/enumeration/SpecialExecutor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ClientCallImplInterceptor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/GrpcServerInterceptor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ServerHeaderInterceptor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/pojo/TrafficMessage.java diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml index c07dc8fa63..c4d853e27c 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml @@ -26,6 +26,8 @@ 2.0.9 3.2.0 2.6.12 + 1.52.1 + 3.18.0 5.4.0 2.6.0 @@ -72,6 +74,18 @@ ${kafka-clients.version} provided + + io.grpc + grpc-stub + ${grpc.version} + provided + + + com.google.protobuf + protobuf-java + ${protobuf.version} + provided + com.alibaba dubbo diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/ClientCallImplDeclarer.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/ClientCallImplDeclarer.java new file mode 100644 index 0000000000..8fd5307a9b --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/ClientCallImplDeclarer.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc; + +import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; +import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; +import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.ClientCallImplInterceptor; + +/** + * grpc client端Declarer,拦截header参数注入流量标签,支持grpc 1.13+版本 + * + * @author daizhenyu + * @since 2023-08-21 + **/ +public class ClientCallImplDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "io.grpc.internal.ClientCallImpl"; + + private static final String METHOD_NAMES = "start"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAMES), + new ClientCallImplInterceptor())}; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/GrpcServerDeclarer.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/GrpcServerDeclarer.java new file mode 100644 index 0000000000..302e8ddca8 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/rpc/grpc/GrpcServerDeclarer.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc; + +import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; +import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; +import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.GrpcServerInterceptor; + +/** + * grpc server端declare,支持grpc 1.13+版本 + * + * @author daizhenyu + * @since 2023-08-15 + **/ +public class GrpcServerDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder"; + + private static final String METHOD_NAME = "build"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), new GrpcServerInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/enumeration/SpecialExecutor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/enumeration/SpecialExecutor.java new file mode 100644 index 0000000000..f1308a3e5c --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/enumeration/SpecialExecutor.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.enumeration; + +/** + * 特殊的线程池枚举类,该类线程池不会使用新线程执行方法 + * + * @author daizhenyu + * @since 2023-09-04 + **/ +public enum SpecialExecutor { + /** + * grpc的ThreadlessExecutor线程池。 + * 对于该线程池,主线程会创建子线程提交线程任务,然后主线程执行。 + * ThreadlessExecutor协调两个线程的任务,形成生产消费的模式 + */ + THREAD_LESS_EXECUTOR("ThreadlessExecutor"), + + /** + * grpc的SynchronizationContext线程池。 + * 对于该线程池,并不会使用新的线程执行线程任务,而是在调用该线程池的线程中依次执行线程池队列的任务。 + */ + SYNCHRONIZATION_CONTEXT("SynchronizationContext"), + + /** + * 用于getSpecialExecutorByName方法寻找不到合适的线程池枚举对象时返回返回值 + */ + OTHER_EXECUTORS("otherExecutors"); + + private final String executorName; + + SpecialExecutor(String executorName) { + this.executorName = executorName; + } + + public String getExecutorName() { + return this.executorName; + } + + /** + * 用于根据线程池名称获取线程池枚举对象 + * + * @param name 线程池名称 + * @return SpecialExecutor对象 + */ + public static SpecialExecutor getSpecialExecutorByName(String name) { + for (SpecialExecutor value : values()) { + if (value.getExecutorName().equals(name)) { + return value; + } + } + return OTHER_EXECUTORS; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/AbstractExecutorInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/AbstractExecutorInterceptor.java index 7e30344e96..39bd596407 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/AbstractExecutorInterceptor.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/AbstractExecutorInterceptor.java @@ -22,6 +22,7 @@ import com.huaweicloud.sermant.core.utils.tag.TrafficData; import com.huaweicloud.sermant.core.utils.tag.TrafficTag; import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; +import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage; import com.huaweicloud.sermant.tag.transmission.wrapper.CallableWrapper; import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableAndCallableWrapper; import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableWrapper; @@ -61,54 +62,59 @@ public ExecuteContext before(ExecuteContext context) { if (trafficTag == null && trafficData == null) { return context; } - + TrafficMessage trafficMessage = new TrafficMessage(trafficTag, trafficData); + Object executorObject = context.getObject(); + String executorName = executorObject.getClass().getSimpleName(); Object argument = arguments[0]; if (argument instanceof RunnableAndCallableWrapper || argument instanceof RunnableWrapper || argument instanceof CallableWrapper) { return context; } if (argument instanceof Runnable && argument instanceof Callable) { - return buildRunnableAndCallableWrapper(context, arguments, trafficTag, trafficData, argument); + return buildRunnableAndCallableWrapper(context, arguments, trafficMessage, argument, executorName); } if (argument instanceof Runnable) { - return buildRunnableWrapper(context, arguments, trafficTag, trafficData, argument); + return buildRunnableWrapper(context, arguments, trafficMessage, argument, executorName); } if (argument instanceof Callable) { - return buildCallableWrapper(context, arguments, trafficTag, trafficData, argument); + return buildCallableWrapper(context, arguments, trafficMessage, argument, executorName); } return context; } - private ExecuteContext buildCallableWrapper(ExecuteContext context, Object[] arguments, TrafficTag trafficTag, - TrafficData trafficData, Object argument) { - log(argument, trafficTag, trafficData, CallableWrapper.class.getCanonicalName()); - arguments[0] = new CallableWrapper<>((Callable) argument, trafficTag, trafficData, - cannotTransmit); + private ExecuteContext buildCallableWrapper(ExecuteContext context, Object[] arguments, + TrafficMessage trafficMessage, + Object argument, + String executorName) { + log(argument, trafficMessage, CallableWrapper.class.getCanonicalName()); + arguments[0] = new CallableWrapper<>((Callable) argument, trafficMessage, + cannotTransmit, executorName); return context; } - private ExecuteContext buildRunnableWrapper(ExecuteContext context, Object[] arguments, TrafficTag trafficTag, - TrafficData trafficData, Object argument) { - log(argument, trafficTag, trafficData, RunnableWrapper.class.getCanonicalName()); - arguments[0] = new RunnableWrapper<>((Runnable) argument, trafficTag, trafficData, - cannotTransmit); + private ExecuteContext buildRunnableWrapper(ExecuteContext context, Object[] arguments, + TrafficMessage trafficMessage, + Object argument, + String executorName) { + log(argument, trafficMessage, RunnableWrapper.class.getCanonicalName()); + arguments[0] = new RunnableWrapper<>((Runnable) argument, trafficMessage, + cannotTransmit, executorName); return context; } private ExecuteContext buildRunnableAndCallableWrapper(ExecuteContext context, Object[] arguments, - TrafficTag trafficTag, - TrafficData trafficData, Object argument) { - log(argument, trafficTag, trafficData, RunnableAndCallableWrapper.class.getCanonicalName()); + TrafficMessage trafficMessage, Object argument, String executorName) { + log(argument, trafficMessage, RunnableAndCallableWrapper.class.getCanonicalName()); arguments[0] = new RunnableAndCallableWrapper<>((Runnable) argument, (Callable) argument, - trafficTag, trafficData, cannotTransmit); + trafficMessage, cannotTransmit, executorName); return context; } - private void log(Object argument, TrafficTag trafficTag, TrafficData trafficData, String wrapperClassName) { + private void log(Object argument, TrafficMessage trafficMessage, String wrapperClassName) { LOGGER.log(Level.FINE, "Class name is {0}, hash code is {1}, trafficTag is {2}, " + "trafficData is {3}, will be converted to {4}.", new Object[]{argument.getClass().getName(), Integer.toHexString(argument.hashCode()), - trafficTag, trafficData, wrapperClassName}); + trafficMessage.getTrafficTag(), trafficMessage.getTrafficData(), wrapperClassName}); } @Override diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ClientCallImplInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ClientCallImplInterceptor.java new file mode 100644 index 0000000000..ab9e4c2eec --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ClientCallImplInterceptor.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.utils.CollectionUtils; +import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; +import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractClientInterceptor; + +import io.grpc.Metadata; + +import java.util.List; + +/** + * grpc使用DynamicMessage调用服务端,拦截header参数注入流量标签 + * + * @author daizhenyu + * @since 2023-08-21 + **/ +public class ClientCallImplInterceptor extends AbstractClientInterceptor { + @Override + protected ExecuteContext doBefore(ExecuteContext context) { + Object[] arguments = context.getArguments(); + + // 被拦截方法的入参数量为2 + if (arguments == null || arguments.length <= 1) { + return context; + } + Object metadataObject = arguments[1]; + if (metadataObject instanceof Metadata) { + injectTrafficTag2Carrier((Metadata) metadataObject); + } + return context; + } + + @Override + protected ExecuteContext doAfter(ExecuteContext context) { + return context; + } + + @Override + protected void injectTrafficTag2Carrier(Metadata header) { + for (String key : tagTransmissionConfig.getTagKeys()) { + List values = TrafficUtils.getTrafficTag().getTag().get(key); + if (CollectionUtils.isEmpty(values)) { + continue; + } + header.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), values.get(0)); + } + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/GrpcServerInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/GrpcServerInterceptor.java new file mode 100644 index 0000000000..84629d23c5 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/GrpcServerInterceptor.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractServerInterceptor; + +import io.grpc.Metadata; +import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * grpc 服务端拦截器 + * + * @author daizhenyu + * @since 2023-08-15 + **/ +public class GrpcServerInterceptor extends AbstractServerInterceptor { + @Override + protected ExecuteContext doBefore(ExecuteContext context) { + Object object = context.getObject(); + if (object == null) { + return context; + } + if (object instanceof ServerBuilder) { + ServerBuilder builder = (ServerBuilder) object; + ServerInterceptor interceptor = new ServerHeaderInterceptor(); + builder.intercept(interceptor); + } + return context; + } + + @Override + protected ExecuteContext doAfter(ExecuteContext context) { + return context; + } + + @Override + protected Map> extractTrafficTagFromCarrier(Metadata metadata) { + return new HashMap<>(); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ServerHeaderInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ServerHeaderInterceptor.java new file mode 100644 index 0000000000..7d13d190ab --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/rpc/grpc/ServerHeaderInterceptor.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc; + +import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; +import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; +import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig; + +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * grpc内部的server interceptor,从grpc的header中提取流量标签 + * + * @author daizhenyu + * @since 2023-08-15 + **/ +public class ServerHeaderInterceptor implements ServerInterceptor { + protected final TagTransmissionConfig tagTransmissionConfig; + + /** + * 构造方法 + */ + public ServerHeaderInterceptor() { + tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class); + } + + /** + * 使用grpc提供的server端拦截器获取header中的流量标签 + */ + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + final Metadata requestHeaders, + ServerCallHandler next) { + // 处理header + if (requestHeaders != null) { + extractTrafficTagFromCarrier(requestHeaders); + } + return next.startCall(new SimpleForwardingServerCall(call) { + }, requestHeaders); + } + + private void extractTrafficTagFromCarrier(Metadata requestHeaders) { + Map> tag = new HashMap<>(); + for (String key : tagTransmissionConfig.getTagKeys()) { + String value = requestHeaders.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + + // 流量标签的value为null时,也需存入本地变量,覆盖原来的value,以防误用旧流量标签 + if (value == null) { + tag.put(key, null); + continue; + } + tag.put(key, Collections.singletonList(value)); + } + TrafficUtils.updateTrafficTag(tag); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/pojo/TrafficMessage.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/pojo/TrafficMessage.java new file mode 100644 index 0000000000..b48a270a8d --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/pojo/TrafficMessage.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.pojo; + +import com.huaweicloud.sermant.core.utils.tag.TrafficData; +import com.huaweicloud.sermant.core.utils.tag.TrafficTag; + +/** + * TrafficMessage,包含TrafficTag和TrafficData + * + * @author daizhenyu + * @since 2023-09-05 + **/ +public class TrafficMessage { + private TrafficTag trafficTag; + + private TrafficData trafficData; + + /** + * 无参构造方法 + */ + public TrafficMessage() { + } + + /** + * 有参构造方法 + * + * @param trafficTag 流量标签 + * @param trafficData 流量数据 + */ + public TrafficMessage(TrafficTag trafficTag, TrafficData trafficData) { + this.trafficTag = trafficTag; + this.trafficData = trafficData; + } + + public void setTrafficData(TrafficData trafficData) { + this.trafficData = trafficData; + } + + public void setTrafficTag(TrafficTag trafficTag) { + this.trafficTag = trafficTag; + } + + public TrafficTag getTrafficTag() { + return trafficTag; + } + + public TrafficData getTrafficData() { + return trafficData; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/AbstractThreadWrapper.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/AbstractThreadWrapper.java index 4ef6a8fea5..acb58d549f 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/AbstractThreadWrapper.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/AbstractThreadWrapper.java @@ -20,6 +20,7 @@ import com.huaweicloud.sermant.core.utils.tag.TrafficData; import com.huaweicloud.sermant.core.utils.tag.TrafficTag; import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; +import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage; import java.util.concurrent.Callable; import java.util.logging.Level; @@ -35,6 +36,11 @@ public abstract class AbstractThreadWrapper { private static final Logger LOGGER = LoggerFactory.getLogger(); + /** + * 执行该线程对象的线程池名称 + */ + protected final String executorName; + private final Runnable runnable; private final Callable callable; @@ -50,22 +56,23 @@ public abstract class AbstractThreadWrapper { * * @param runnable runnable * @param callable callable - * @param trafficTag 请求标记 - * @param trafficData 请求数据 + * @param trafficMessage 流量信息 * @param cannotTransmit 执行方法之前是否需要删除线程变量 + * @param executorName */ - public AbstractThreadWrapper(Runnable runnable, Callable callable, TrafficTag trafficTag, - TrafficData trafficData, boolean cannotTransmit) { + public AbstractThreadWrapper(Runnable runnable, Callable callable, TrafficMessage trafficMessage, + boolean cannotTransmit, String executorName) { this.runnable = runnable; this.callable = callable; if (cannotTransmit) { this.trafficTag = null; this.trafficData = null; } else { - this.trafficTag = trafficTag; - this.trafficData = trafficData; + this.trafficTag = trafficMessage.getTrafficTag(); + this.trafficData = trafficMessage.getTrafficData(); } this.cannotTransmit = cannotTransmit; + this.executorName = executorName; } /** @@ -95,7 +102,12 @@ public T call() throws Exception { } } - private void before(Object obj) { + /** + * 线程对象执行的前置方法 + * + * @param obj 线程对象 + */ + protected void before(Object obj) { if (cannotTransmit) { // 当开启普通线程透传,不开启线程池透传时,需要在执行方法之前,删除由InheritableThreadLocal透传的数据 TrafficUtils.removeTrafficTag(); @@ -113,7 +125,10 @@ private void before(Object obj) { Integer.toHexString(obj.hashCode()), trafficTag, trafficData}); } - private void after() { + /** + * 线程对象执行的后置方法 + */ + protected void after() { TrafficUtils.removeTrafficTag(); TrafficUtils.removeTrafficData(); } diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/CallableWrapper.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/CallableWrapper.java index a269a71e3b..720132e042 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/CallableWrapper.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/CallableWrapper.java @@ -16,8 +16,7 @@ package com.huaweicloud.sermant.tag.transmission.wrapper; -import com.huaweicloud.sermant.core.utils.tag.TrafficData; -import com.huaweicloud.sermant.core.utils.tag.TrafficTag; +import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage; import java.util.concurrent.Callable; @@ -33,12 +32,12 @@ public class CallableWrapper extends AbstractThreadWrapper implements Call * 构造方法 * * @param callable callable - * @param trafficTag 请求标记 - * @param trafficData 请求数据 + * @param trafficMessage 流量信息 * @param cannotTransmit 执行方法之前是否需要删除线程变量 + * @param executorName 线程池名称 */ - public CallableWrapper(Callable callable, TrafficTag trafficTag, TrafficData trafficData, - boolean cannotTransmit) { - super(null, callable, trafficTag, trafficData, cannotTransmit); + public CallableWrapper(Callable callable, TrafficMessage trafficMessage, + boolean cannotTransmit, String executorName) { + super(null, callable, trafficMessage, cannotTransmit, executorName); } } \ No newline at end of file diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableAndCallableWrapper.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableAndCallableWrapper.java index 2ced71f010..7efe252b06 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableAndCallableWrapper.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableAndCallableWrapper.java @@ -16,8 +16,7 @@ package com.huaweicloud.sermant.tag.transmission.wrapper; -import com.huaweicloud.sermant.core.utils.tag.TrafficData; -import com.huaweicloud.sermant.core.utils.tag.TrafficTag; +import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage; import java.util.concurrent.Callable; @@ -34,12 +33,12 @@ public class RunnableAndCallableWrapper extends AbstractThreadWrapper impl * * @param runnable runnable * @param callable callable - * @param trafficTag 请求标记 - * @param trafficData 请求数据 + * @param trafficMessage 流量信息 * @param cannotTransmit 执行方法之前是否需要删除线程变量 + * @param executorName 线程池名称 */ - public RunnableAndCallableWrapper(Runnable runnable, Callable callable, TrafficTag trafficTag, - TrafficData trafficData, boolean cannotTransmit) { - super(runnable, callable, trafficTag, trafficData, cannotTransmit); + public RunnableAndCallableWrapper(Runnable runnable, Callable callable, TrafficMessage trafficMessage, + boolean cannotTransmit, String executorName) { + super(runnable, callable, trafficMessage, cannotTransmit, executorName); } } \ No newline at end of file diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableWrapper.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableWrapper.java index 3ff980d49f..7c7aae8a36 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableWrapper.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/wrapper/RunnableWrapper.java @@ -16,8 +16,8 @@ package com.huaweicloud.sermant.tag.transmission.wrapper; -import com.huaweicloud.sermant.core.utils.tag.TrafficData; -import com.huaweicloud.sermant.core.utils.tag.TrafficTag; +import com.huaweicloud.sermant.tag.transmission.enumeration.SpecialExecutor; +import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage; /** * Runnable包装类 @@ -31,11 +31,36 @@ public class RunnableWrapper extends AbstractThreadWrapper implements Runn * 构造方法 * * @param runnable runnable - * @param trafficTag 请求标记 - * @param trafficData 请求数据 + * @param trafficMessage 流量信息 * @param cannotTransmit 执行方法之前是否需要删除线程变量 + * @param executorName 线程池名称 */ - public RunnableWrapper(Runnable runnable, TrafficTag trafficTag, TrafficData trafficData, boolean cannotTransmit) { - super(runnable, null, trafficTag, trafficData, cannotTransmit); + public RunnableWrapper(Runnable runnable, TrafficMessage trafficMessage, boolean cannotTransmit, + String executorName) { + super(runnable, null, trafficMessage, cannotTransmit, executorName); + } + + @Override + protected void before(Object obj) { + // 处理特殊线程池,以下两类线程池会在调用对应线程池执行方法的线程中依次执行线程池队列中的任务,不需要重新设置流量标签 + switch (SpecialExecutor.getSpecialExecutorByName(this.executorName)) { + case THREAD_LESS_EXECUTOR: + case SYNCHRONIZATION_CONTEXT: + return; + default: + super.before(obj); + } + } + + @Override + protected void after() { + // 处理特殊线程池,以下两类线程池会在调用对应线程池执行方法的线程中依次执行线程池队列中的任务,防止误删流量标签 + switch (SpecialExecutor.getSpecialExecutorByName(this.executorName)) { + case THREAD_LESS_EXECUTOR: + case SYNCHRONIZATION_CONTEXT: + return; + default: + super.after(); + } } } \ No newline at end of file diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer index f77ab4493f..d1229416d8 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -25,6 +25,8 @@ com.huaweicloud.sermant.tag.transmission.declarers.rpc.dubbo.ApacheDubboConsumer com.huaweicloud.sermant.tag.transmission.declarers.rpc.dubbo.AlibabaDubboConsumerDeclarer com.huaweicloud.sermant.tag.transmission.declarers.rpc.sofarpc.SofaRpcClientDeclarer com.huaweicloud.sermant.tag.transmission.declarers.rpc.sofarpc.SofaRpcServerDeclarer +com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc.GrpcServerDeclarer +com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc.ClientCallImplDeclarer com.huaweicloud.sermant.tag.transmission.declarers.rpc.servicecomb.ServiceCombRpcConsumerDeclarer com.huaweicloud.sermant.tag.transmission.declarers.rpc.servicecomb.ServiceCombRpcProviderDeclarer com.huaweicloud.sermant.tag.transmission.declarers.mq.kafka.KafkaProducerDeclarer diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ExecutorInterceptorTest.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ExecutorInterceptorTest.java index 78d42402e8..741a24ff86 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ExecutorInterceptorTest.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ExecutorInterceptorTest.java @@ -21,6 +21,7 @@ import com.huaweicloud.sermant.tag.transmission.BaseTest; import com.huaweicloud.sermant.tag.transmission.RunnableAndCallable; import com.huaweicloud.sermant.tag.transmission.interceptors.crossthread.ExecutorInterceptor; +import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage; import com.huaweicloud.sermant.tag.transmission.wrapper.CallableWrapper; import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableAndCallableWrapper; import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableWrapper; @@ -68,7 +69,8 @@ public void testBefore() { TrafficUtils.updateTrafficTag(Collections.singletonMap("foo", Collections.singletonList("bar"))); // 测试已经包装过了 - RunnableWrapper runnableWrapper = new RunnableWrapper<>(null, null, null, false); + TrafficMessage trafficMessage = new TrafficMessage(null, null); + RunnableWrapper runnableWrapper = new RunnableWrapper<>(null, trafficMessage, false, null); arguments[0] = runnableWrapper; interceptor.before(context); Assert.assertEquals(runnableWrapper, context.getArguments()[0]); diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ScheduledExecutorServiceInterceptorTest.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ScheduledExecutorServiceInterceptorTest.java index 4d008a40ef..ca4a4d658e 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ScheduledExecutorServiceInterceptorTest.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/test/java/com/huaweicloud/sermant/tag/transmission/interceptors/crossthread/ScheduledExecutorServiceInterceptorTest.java @@ -21,6 +21,7 @@ import com.huaweicloud.sermant.tag.transmission.BaseTest; import com.huaweicloud.sermant.tag.transmission.RunnableAndCallable; import com.huaweicloud.sermant.tag.transmission.interceptors.crossthread.ScheduledExecutorServiceInterceptor; +import com.huaweicloud.sermant.tag.transmission.pojo.TrafficMessage; import com.huaweicloud.sermant.tag.transmission.wrapper.CallableWrapper; import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableAndCallableWrapper; import com.huaweicloud.sermant.tag.transmission.wrapper.RunnableWrapper; @@ -68,7 +69,8 @@ public void testBefore() { TrafficUtils.updateTrafficTag(Collections.singletonMap("foo", Collections.singletonList("bar"))); // 测试已经包装过了 - RunnableWrapper runnableWrapper = new RunnableWrapper<>(null, null, null, false); + TrafficMessage trafficMessage = new TrafficMessage(null, null); + RunnableWrapper runnableWrapper = new RunnableWrapper<>(null, trafficMessage, false, null); arguments[0] = runnableWrapper; interceptor.before(context); Assert.assertEquals(runnableWrapper, context.getArguments()[0]);