Skip to content

Commit

Permalink
流量标签透传支持servicecombrpc2.0+版本
Browse files Browse the repository at this point in the history
  • Loading branch information
daizhenyu committed Aug 28, 2023
1 parent e3ba25d commit 3395c6f
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<dubbo.version>3.2.0</dubbo.version>
<alibaba.dubbo.version>2.6.12</alibaba.dubbo.version>
<sofa-rpc.version>5.4.0</sofa-rpc.version>
<servicecomb-java-chassis.version>2.6.0</servicecomb-java-chassis.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -89,6 +90,12 @@
<version>${sofa-rpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.servicecomb</groupId>
<artifactId>java-chassis-core</artifactId>
<version>${servicecomb-java-chassis.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.servicecomb;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.servicecomb.ServiceCombRpcConsumerInterceptor;

/**
* servicecombRPC consumer端declarer,支持servicecomb2.x版本
*
* @author daizhenyu
* @since 2023-08-26
**/
public class ServiceCombRpcConsumerDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.servicecomb.core.handler.impl.TransportClientHandler";

private static final String METHOD_NAME = "handle";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME),
new ServiceCombRpcConsumerInterceptor())};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers.rpc.servicecomb;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.rpc.servicecomb.ServiceCombRpcProviderInterceptor;

/**
* servicecombRPC provider端declarer,支持servicecomb2.x版本
*
* @author daizhenyu
* @since 2023-08-26
**/
public class ServiceCombRpcProviderDeclarer extends AbstractPluginDeclarer {
private static final String ENHANCE_CLASS = "org.apache.servicecomb.core.handler.impl.ProducerOperationHandler";

private static final String METHOD_NAME = "handle";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME),
new ServiceCombRpcProviderInterceptor())};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.servicecomb;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.utils.CollectionUtils;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractClientInterceptor;

import org.apache.servicecomb.core.Invocation;

import java.util.List;

/**
* servicecombRPC consumer端interceptor,支持servicecomb2.x版本
*
* @author daizhenyu
* @since 2023-08-26
**/
public class ServiceCombRpcConsumerInterceptor extends AbstractClientInterceptor<Invocation> {
private static final String CONSUMER_KEY = "consumer_key";

private static final String CONSUMER_VALUE = "consumer_value";

@Override
protected ExecuteContext doBefore(ExecuteContext context) {
Object[] arguments = context.getArguments();
if (arguments == null || arguments.length == 0) {
return context;
}
Object invocationObject = arguments[0];
if (invocationObject instanceof Invocation) {
Invocation invocation = (Invocation) invocationObject;

// 传入servicecombrpc的header中,用于provider端识别本次调用是否来自servicecombrpc consumer端
invocation.getContext().put(CONSUMER_KEY, CONSUMER_VALUE);
injectTrafficTag2Carrier(invocation);
}
return context;
}

@Override
protected ExecuteContext doAfter(ExecuteContext context) {
return context;
}

/**
* 向Invocation中添加流量标签
*
* @param invocation servicecomb rpc客服端 标签传递载体
*/
@Override
protected void injectTrafficTag2Carrier(Invocation invocation) {
for (String key : tagTransmissionConfig.getTagKeys()) {
List<String> values = TrafficUtils.getTrafficTag().getTag().get(key);
if (CollectionUtils.isEmpty(values)) {
continue;
}
invocation.getContext().put(key, values.get(0));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors.rpc.servicecomb;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.interceptors.AbstractServerInterceptor;

import org.apache.servicecomb.core.Invocation;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* servicecombRPC provider端interceptor,支持servicecomb2.x版本
*
* @author daizhenyu
* @since 2023-08-26
**/
public class ServiceCombRpcProviderInterceptor extends AbstractServerInterceptor<Invocation> {
private static final String CONSUMER_KEY = "consumer_key";

@Override
protected ExecuteContext doBefore(ExecuteContext context) {
Object[] arguments = context.getArguments();
if (arguments == null || arguments.length == 0) {
return context;
}
Object invocationObject = arguments[0];
if (invocationObject instanceof Invocation) {
Invocation invocation = (Invocation) invocationObject;

// 判断本次rpc调用是否来自servicecombrpc consumer端
if (invocation.getContext().containsKey(CONSUMER_KEY)
|| invocation.getRequestEx().getHeader(CONSUMER_KEY) != null) {
TrafficUtils.updateTrafficTag(extractTrafficTagFromCarrier(invocation));
}
}
return context;
}

@Override
protected ExecuteContext doAfter(ExecuteContext context) {
TrafficUtils.removeTrafficTag();
return context;
}

@Override
public ExecuteContext onThrow(ExecuteContext context) {
TrafficUtils.removeTrafficTag();
return context;
}

/**
* 从Invocation中解析流量标签
*
* @param invocation servicecomb rpc服务端的流量标签载体
* @return 流量标签
*/
@Override
protected Map<String, List<String>> extractTrafficTagFromCarrier(Invocation invocation) {
Map<String, List<String>> tag = new HashMap<>();
for (String key : tagTransmissionConfig.getTagKeys()) {
if (invocation.getContext().containsKey(key) && invocation.getContext().get(key) != null) {
// cse 场景
tag.put(key, Collections.singletonList(invocation.getContext().get(key)));
continue;
}

// 非cse 场景
String value = invocation.getRequestEx().getHeader(key);
if (value != null) {
tag.put(key, Collections.singletonList(value));
}

// 流量标签的value为null时,也需存入本地变量,覆盖原来的value,以防误用旧流量标签
tag.put(key, null);
}
return tag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ com.huaweicloud.sermant.tag.transmission.declarers.rpc.dubbo.ApacheDubboConsumer
com.huaweicloud.sermant.tag.transmission.declarers.rpc.dubbo.AlibabaDubboConsumerDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.rpc.sofarpc.SofaRpcClientDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.rpc.sofarpc.SofaRpcServerDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.rpc.servicecomb.ServiceCombRpcConsumerDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.rpc.servicecomb.ServiceCombRpcProviderDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.mq.kafka.KafkaProducerDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.http.client.httpclient.HttpClient3xDeclarer
com.huaweicloud.sermant.tag.transmission.declarers.http.client.okhttp.OkHttp2xDeclarer
Expand Down

0 comments on commit 3395c6f

Please sign in to comment.