Skip to content

Commit

Permalink
Sermant流量标签透传插件支持grpc框架,grpc框架版本为1.13+
Browse files Browse the repository at this point in the history
  • Loading branch information
daizhenyu committed Aug 23, 2023
1 parent 4fc50af commit fc791c6
Show file tree
Hide file tree
Showing 15 changed files with 630 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 流量中包含的请求信息
Expand All @@ -32,6 +33,8 @@ public class TrafficUtils {

private static ThreadLocal<TrafficData> data = new ThreadLocal<>();

private static AtomicBoolean isDynamicMessage = new AtomicBoolean();

private TrafficUtils() {
}

Expand Down Expand Up @@ -117,4 +120,22 @@ public static void setTrafficData(TrafficData value) {
public static void removeTrafficData() {
data.remove();
}

/**
* 设置grpc是否使用DynamicMessage调用服务端
*
* @param flag
*/
public static void setIsDynamicMessage(boolean flag) {
isDynamicMessage.set(flag);
}

/**
* 查询grpc是否使用DynamicMessage调用服务端
*
* @return boolean
*/
public static boolean getIsDynamicMessage() {
return isDynamicMessage.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
<powermock.version>2.0.9</powermock.version>
<dubbo.version>3.2.0</dubbo.version>
<alibaba.dubbo.version>2.6.12</alibaba.dubbo.version>
<grpc.version>1.52.1</grpc.version>
<protobuf.version>3.18.0</protobuf.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -68,6 +70,18 @@
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.ClientUseDynamicMessageInterceptor;

/**
* grpc使用DynamicMessage方式调用服务端declarer,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientUseDynamicMessageDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.stub.ClientCalls";

private static final String[] METHOD_NAMES = {"asyncUnaryCall", "asyncServerStreamingCall", "futureUnaryCall"};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_NAMES),
new ClientUseDynamicMessageInterceptor())};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.ClientUseDynamicMessageImplInterceptor;

/**
* grpc使用DynamicMessage调用服务端,拦截header参数注入流量标签,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientUseDynamicMessageImplDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.internal.ClientCallImpl";

private static final String METHOD_NAMES = "start";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAMES),
new ClientUseDynamicMessageImplInterceptor())};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.GrpcClientInterceptor;

/**
* grpc 客户端 declarer,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-15
**/
public class GrpcClientDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.stub.AbstractStub";

private static final int CONSTRUCTOR_PARAM_COUNT = 2;

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor().
and(MethodMatcher.paramCountEquals(CONSTRUCTOR_PARAM_COUNT)),
new GrpcClientInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.GrpcServerInterceptor;

/**
* grpc server端declare,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-15
**/
public class GrpcServerDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder";

private static final String METHOD_NAME = "build";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), new GrpcServerInterceptor())
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,37 @@ public ExecuteContext before(ExecuteContext context) {
private ExecuteContext buildCallableWrapper(ExecuteContext context, Object[] arguments, TrafficTag trafficTag,
TrafficData trafficData, Object argument) {
log(argument, trafficTag, trafficData, CallableWrapper.class.getCanonicalName());
arguments[0] = new CallableWrapper<>((Callable<?>) argument, trafficTag, trafficData,
CallableWrapper<?> callableWrapper = new CallableWrapper<>((Callable<?>) argument, trafficTag, trafficData,
cannotTransmit);

// 传入父线程名称
callableWrapper.setParentThreadName(Thread.currentThread().getName());
arguments[0] = callableWrapper;
return context;
}

private ExecuteContext buildRunnableWrapper(ExecuteContext context, Object[] arguments, TrafficTag trafficTag,
TrafficData trafficData, Object argument) {
log(argument, trafficTag, trafficData, RunnableWrapper.class.getCanonicalName());
arguments[0] = new RunnableWrapper<>((Runnable) argument, trafficTag, trafficData,
RunnableWrapper<?> runnableWrapper = new RunnableWrapper<>((Runnable) argument, trafficTag, trafficData,
cannotTransmit);

// 传入父线程名称
runnableWrapper.setParentThreadName(Thread.currentThread().getName());
arguments[0] = runnableWrapper;
return context;
}

private ExecuteContext buildRunnableAndCallableWrapper(ExecuteContext context, Object[] arguments,
TrafficTag trafficTag,
TrafficData trafficData, Object argument) {
log(argument, trafficTag, trafficData, RunnableAndCallableWrapper.class.getCanonicalName());
arguments[0] = new RunnableAndCallableWrapper<>((Runnable) argument, (Callable<?>) argument,
trafficTag, trafficData, cannotTransmit);
RunnableAndCallableWrapper<?> runnableAndCallableWrapper = new RunnableAndCallableWrapper<>((Runnable) argument,
(Callable<?>) argument, trafficTag, trafficData, cannotTransmit);

// 传入父线程名称
runnableAndCallableWrapper.setParentThreadName(Thread.currentThread().getName());
arguments[0] = runnableAndCallableWrapper;
return context;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc;

import com.huaweicloud.sermant.core.utils.MapUtils;
import com.huaweicloud.sermant.core.utils.tag.TrafficTag;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;

import java.util.List;
import java.util.Map;

/**
* grpc内部的client interceptor, 将流量标签注入grpc的header中
*
* @author daizhenyu
* @since 2023-08-15
**/
public class ClientMetadataInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 注入流量标签
TrafficTag trafficTag = TrafficUtils.getTrafficTag();
if (headers != null || trafficTag == null || MapUtils.isEmpty(trafficTag.getTag())) {
insertTag2Header(headers, TrafficUtils.getTrafficTag().getTag());
}
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) { }, headers);
}
};
}

private void insertTag2Header(Metadata header, Map<String, List<String>> tag) {
for (Map.Entry<String, List<String>> entry : tag.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
header.put(Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER), entry.getValue().get(0));
}
}
}
Loading

0 comments on commit fc791c6

Please sign in to comment.