Skip to content

Commit

Permalink
【feature】流量标签透传特性
Browse files Browse the repository at this point in the history
1.支持kafka消费以及生产过程透传
2.支持动态配置
  • Loading branch information
lilai23 committed Jul 21, 2023
1 parent 8de452d commit 2949c67
Show file tree
Hide file tree
Showing 15 changed files with 622 additions and 31 deletions.
4 changes: 4 additions & 0 deletions sermant-plugins/sermant-tag-transmission/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<modules>
<module>tag-transmission-plugin</module>
<module>tag-transmission-service</module>
</modules>

<properties>
Expand All @@ -31,18 +32,21 @@
</activation>
<modules>
<module>tag-transmission-plugin</module>
<module>tag-transmission-service</module>
</modules>
</profile>
<profile>
<id>test</id>
<modules>
<module>tag-transmission-plugin</module>
<module>tag-transmission-service</module>
</modules>
</profile>
<profile>
<id>release</id>
<modules>
<module>tag-transmission-plugin</module>
<module>tag-transmission-service</module>
</modules>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<config.skip.flag>false</config.skip.flag>
<package.plugin.type>plugin</package.plugin.type>
<apache-httpclient.version>4.3</apache-httpclient.version>
<kafka-clients.version>2.7.0</kafka-clients.version>
</properties>

<dependencies>
Expand All @@ -25,17 +26,18 @@
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.huaweicloud.sermant</groupId>
<artifactId>sermant-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${apache-httpclient.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,12 @@ public List<String> getTagKeys() {
public void setTagKeys(List<String> tagKeys) {
this.tagKeys = tagKeys;
}

@Override
public String toString() {
return "TagTransmissionConfig{"
+ "enabled=" + enabled
+ ", tagKeys=" + tagKeys
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.KafkaConsumerRecordInterceptor;

/**
* kafka获取消息内容的拦截点声明,支持1.x, 2.x, 3.x
*
* @author lilai
* @since 2023-07-18
*/
public class KafkaConsumerRecordDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名
*/
private static final String ENHANCE_CLASSES = "org.apache.kafka.clients.consumer.ConsumerRecord";

/**
* 拦截类的全限定名
*/
private static final String INTERCEPT_CLASS = KafkaConsumerRecordInterceptor.class.getCanonicalName();

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASSES);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals("value"), INTERCEPT_CLASS)
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.declarers;

import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher;
import com.huaweicloud.sermant.tag.transmission.interceptors.KafkaProducerInterceptor;

/**
* kafka生产消息增强拦截点声明,支持1.x, 2.x, 3.x
*
* @author lilai
* @since 2023-07-18
*/
public class KafkaProviderDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名
*/
private static final String ENHANCE_CLASSES = "org.apache.kafka.clients.producer.KafkaProducer";

/**
* 拦截类的全限定名
*/
private static final String INTERCEPT_CLASS = KafkaProducerInterceptor.class.getCanonicalName();

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASSES);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals("doSend"), INTERCEPT_CLASS)
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager;
import com.huaweicloud.sermant.core.utils.MapUtils;
import com.huaweicloud.sermant.core.utils.tag.TrafficTag;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig;

/**
* 客户端拦截器抽象类,获取当前线程的流量标签并透传至下游进程,适用于http客户端/rpc客户端/消息队列生产者
*
* @author lilai
* @since 2023-07-18
*/
public abstract class AbstractClientInterceptor extends AbstractInterceptor {
protected final TagTransmissionConfig tagTransmissionConfig;

/**
* 构造器
*/
public AbstractClientInterceptor() {
this.tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class);
}

@Override
public ExecuteContext before(ExecuteContext context) {
if (!tagTransmissionConfig.isEnabled()) {
return context;
}

TrafficTag trafficTag = TrafficUtils.getTrafficTag();
if (trafficTag == null || MapUtils.isEmpty(trafficTag.getTag())) {
return context;
}

return doBefore(context);
}

@Override
public ExecuteContext after(ExecuteContext context) {
return doAfter(context);
}

/**
* 前置触发点
*
* @param context 执行上下文
* @return 执行上下文
*/
protected abstract ExecuteContext doBefore(ExecuteContext context);

/**
* 后置触发点
*
* @param context 执行上下文
* @return 执行上下文
*/
protected abstract ExecuteContext doAfter(ExecuteContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.interceptors;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager;
import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig;

/**
* 服务端拦截器抽象类,获取跨进程的流量标签并在本进程传递,适用于http服务端/rpc服务端/消息队列消费者
*
* @author lilai
* @since 2023-07-18
*/
public abstract class AbstractServerInterceptor extends AbstractInterceptor {
protected final TagTransmissionConfig tagTransmissionConfig;

/**
* 构造器
*/
public AbstractServerInterceptor() {
this.tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class);
}

@Override
public ExecuteContext before(ExecuteContext context) {
if (!tagTransmissionConfig.isEnabled()) {
return context;
}
return doBefore(context);
}

@Override
public ExecuteContext after(ExecuteContext context) {
return doAfter(context);
}

/**
* 前置触发点
*
* @param context 执行上下文
* @return 执行上下文
*/
protected abstract ExecuteContext doBefore(ExecuteContext context);

/**
* 后置触发点
*
* @param context 执行上下文
* @return 执行上下文
*/
protected abstract ExecuteContext doAfter(ExecuteContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@
package com.huaweicloud.sermant.tag.transmission.interceptors;

import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext;
import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager;
import com.huaweicloud.sermant.core.utils.CollectionUtils;
import com.huaweicloud.sermant.core.utils.tag.TrafficTag;
import com.huaweicloud.sermant.core.utils.tag.TrafficUtils;
import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig;

import org.apache.http.HttpRequest;

Expand All @@ -34,32 +30,14 @@
* @author lilai
* @since 2023-07-17
*/
public class HttpClient4xInterceptor extends AbstractInterceptor {
private final TagTransmissionConfig tagTransmissionConfig;

/**
* 构造器
*/
public HttpClient4xInterceptor() {
tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class);
}

public class HttpClient4xInterceptor extends AbstractClientInterceptor {
@Override
public ExecuteContext before(ExecuteContext context) {
if (!tagTransmissionConfig.isEnabled()) {
return context;
}

TrafficTag trafficTag = TrafficUtils.getTrafficTag();
if (trafficTag == null) {
return context;
}

public ExecuteContext doBefore(ExecuteContext context) {
Object httpRequestObject = context.getArguments()[1];
if (httpRequestObject instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) httpRequestObject;
for (String key : tagTransmissionConfig.getTagKeys()) {
List<String> values = trafficTag.getTag().get(key);
List<String> values = TrafficUtils.getTrafficTag().getTag().get(key);
if (CollectionUtils.isEmpty(values)) {
continue;
}
Expand All @@ -72,7 +50,7 @@ public ExecuteContext before(ExecuteContext context) {
}

@Override
public ExecuteContext after(ExecuteContext context) {
public ExecuteContext doAfter(ExecuteContext context) {
return context;
}
}
Loading

0 comments on commit 2949c67

Please sign in to comment.