Skip to content

Commit

Permalink
Sermant流量标签透传插件支持grpc框架,grpc框架版本为1.13+
Browse files Browse the repository at this point in the history
  • Loading branch information
daizhenyu committed Sep 1, 2023
1 parent 5814e18 commit e12b92d
Show file tree
Hide file tree
Showing 15 changed files with 729 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
<powermock.version>2.0.9</powermock.version>
<dubbo.version>3.2.0</dubbo.version>
<alibaba.dubbo.version>2.6.12</alibaba.dubbo.version>
<grpc.version>1.52.1</grpc.version>
<protobuf.version>3.18.0</protobuf.version>
<sofa-rpc.version>5.4.0</sofa-rpc.version>
<servicecomb-java-chassis.version>2.6.0</servicecomb-java-chassis.version>
</properties>
Expand Down Expand Up @@ -72,6 +74,18 @@
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.ClientCallImplInterceptor;

/**
* grpc使用DynamicMessage调用服务端,拦截header参数注入流量标签,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientCallImplDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.internal.ClientCallImpl";

private static final String METHOD_NAMES = "start";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAMES),
new ClientCallImplInterceptor())};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.ClientCallsInterceptor;

/**
* grpc使用DynamicMessage方式调用服务端,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientCallsDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.stub.ClientCalls";

private static final String[] METHOD_NAMES = {"asyncUnaryCall", "asyncServerStreamingCall", "futureUnaryCall"};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameContains(METHOD_NAMES),
new ClientCallsInterceptor())};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.GrpcClientInterceptor;

/**
* grpc 客户端 declarer,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-15
**/
public class GrpcClientDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.stub.AbstractStub";

private static final int CONSTRUCTOR_PARAM_COUNT = 2;

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor().
and(MethodMatcher.paramCountEquals(CONSTRUCTOR_PARAM_COUNT)),
new GrpcClientInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc.GrpcServerInterceptor;

/**
* grpc server端declare,支持grpc 1.13+版本
*
* @author daizhenyu
* @since 2023-08-15
**/
public class GrpcServerDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder";

private static final String METHOD_NAME = "build";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), new GrpcServerInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.utils.CollectionUtils;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractClientInterceptor;
import com.huaweicloud.sermant.tag.transmission.utils.GrpcDynamicMessageUtils;

import io.grpc.Metadata;

import java.util.List;

/**
* grpc使用DynamicMessage调用服务端,拦截header参数注入流量标签
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientCallImplInterceptor extends AbstractClientInterceptor<Metadata> {
@Override
protected ExecuteContext doBefore(ExecuteContext context) {
Object[] arguments = context.getArguments();

// 被拦截方法的入参数量为2
if (arguments == null || arguments.length <= 1) {
return context;
}
if (!GrpcDynamicMessageUtils.getState().getIsDynamicMessage()) {
return context;
}
GrpcDynamicMessageUtils.getState().setIsDynamicMessage(false);
Object metadataObject = arguments[1];
if (metadataObject instanceof Metadata) {
injectTrafficTag2Carrier((Metadata) metadataObject);
}
return context;
}

@Override
protected ExecuteContext doAfter(ExecuteContext context) {
return context;
}

@Override
protected void injectTrafficTag2Carrier(Metadata header) {
for (String key : tagTransmissionConfig.getTagKeys()) {
List<String> values = TrafficUtils.getTrafficTag().getTag().get(key);
if (CollectionUtils.isEmpty(values)) {
continue;
}
header.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), values.get(0));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.grpc;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractClientInterceptor;
import com.huaweicloud.sermant.tag.transmission.utils.GrpcDynamicMessageUtils;

import com.google.protobuf.DynamicMessage;

/**
* grpc使用DynamicMessage方式调用服务端
*
* @author daizhenyu
* @since 2023-08-21
**/
public class ClientCallsInterceptor extends AbstractClientInterceptor {
@Override
protected ExecuteContext doBefore(ExecuteContext context) {
Object[] arguments = context.getArguments();

// 被拦截方法的入参数量为2
if (arguments == null || arguments.length <= 1) {
return context;
}

if (arguments[1] instanceof DynamicMessage) {
// 用于第二拦截点判定本次调用是否使用DynamicMessage
GrpcDynamicMessageUtils.getState().setIsDynamicMessage(true);
}
return context;
}

@Override
protected ExecuteContext doAfter(ExecuteContext context) {
return context;
}

@Override
protected void injectTrafficTag2Carrier(Object o) {
}
}
Loading

0 comments on commit e12b92d

Please sign in to comment.