From 2949c679016e9a31b9c689ce5bac411f0b4898f3 Mon Sep 17 00:00:00 2001 From: lilai Date: Thu, 20 Jul 2023 17:32:26 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90feature=E3=80=91=E6=B5=81=E9=87=8F?= =?UTF-8?q?=E6=A0=87=E7=AD=BE=E9=80=8F=E4=BC=A0=E7=89=B9=E6=80=A7=201.?= =?UTF-8?q?=E6=94=AF=E6=8C=81kafka=E6=B6=88=E8=B4=B9=E4=BB=A5=E5=8F=8A?= =?UTF-8?q?=E7=94=9F=E4=BA=A7=E8=BF=87=E7=A8=8B=E9=80=8F=E4=BC=A0=202.?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=8A=A8=E6=80=81=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sermant-tag-transmission/pom.xml | 4 + .../tag-transmission-plugin/pom.xml | 12 +-- .../config/TagTransmissionConfig.java | 8 ++ .../KafkaConsumerRecordDeclarer.java | 53 ++++++++++++ .../declarers/KafkaProviderDeclarer.java | 53 ++++++++++++ .../AbstractClientInterceptor.java | 77 +++++++++++++++++ .../AbstractServerInterceptor.java | 68 +++++++++++++++ .../interceptors/HttpClient4xInterceptor.java | 30 +------ .../KafkaConsumerRecordInterceptor.java | 84 +++++++++++++++++++ .../KafkaProducerInterceptor.java | 70 ++++++++++++++++ ....core.plugin.agent.declarer.PluginDeclarer | 2 + .../tag-transmission-service/pom.xml | 47 +++++++++++ .../listener/TagConfigListener.java | 79 +++++++++++++++++ .../service/TagConfigService.java | 49 +++++++++++ ....sermant.core.plugin.service.PluginService | 17 ++++ 15 files changed, 622 insertions(+), 31 deletions(-) create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaConsumerRecordDeclarer.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaProviderDeclarer.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractClientInterceptor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractServerInterceptor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaConsumerRecordInterceptor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaProducerInterceptor.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-service/pom.xml create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/listener/TagConfigListener.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/service/TagConfigService.java create mode 100644 sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.service.PluginService diff --git a/sermant-plugins/sermant-tag-transmission/pom.xml b/sermant-plugins/sermant-tag-transmission/pom.xml index 8c5c66968f..adf60f9519 100644 --- a/sermant-plugins/sermant-tag-transmission/pom.xml +++ b/sermant-plugins/sermant-tag-transmission/pom.xml @@ -14,6 +14,7 @@ tag-transmission-plugin + tag-transmission-service @@ -31,18 +32,21 @@ tag-transmission-plugin + tag-transmission-service test tag-transmission-plugin + tag-transmission-service release tag-transmission-plugin + tag-transmission-service diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml index df1c0f85ff..d4153f05dd 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/pom.xml @@ -17,6 +17,7 @@ false plugin 4.3 + 2.7.0 @@ -25,17 +26,18 @@ sermant-agentcore-core provided - - com.huaweicloud.sermant - sermant-common - provided - org.apache.httpcomponents httpclient ${apache-httpclient.version} provided + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + provided + diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/config/TagTransmissionConfig.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/config/TagTransmissionConfig.java index edde96f0be..d6bc54cee2 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/config/TagTransmissionConfig.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/config/TagTransmissionConfig.java @@ -55,4 +55,12 @@ public List getTagKeys() { public void setTagKeys(List tagKeys) { this.tagKeys = tagKeys; } + + @Override + public String toString() { + return "TagTransmissionConfig{" + + "enabled=" + enabled + + ", tagKeys=" + tagKeys + + '}'; + } } diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaConsumerRecordDeclarer.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaConsumerRecordDeclarer.java new file mode 100644 index 0000000000..dd70a47858 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaConsumerRecordDeclarer.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.declarers; + +import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; +import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; +import com.huaweicloud.sermant.tag.transmission.interceptors.KafkaConsumerRecordInterceptor; + +/** + * kafka获取消息内容的拦截点声明,支持1.x, 2.x, 3.x + * + * @author lilai + * @since 2023-07-18 + */ +public class KafkaConsumerRecordDeclarer extends AbstractPluginDeclarer { + /** + * 增强类的全限定名 + */ + private static final String ENHANCE_CLASSES = "org.apache.kafka.clients.consumer.ConsumerRecord"; + + /** + * 拦截类的全限定名 + */ + private static final String INTERCEPT_CLASS = KafkaConsumerRecordInterceptor.class.getCanonicalName(); + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASSES); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("value"), INTERCEPT_CLASS) + }; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaProviderDeclarer.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaProviderDeclarer.java new file mode 100644 index 0000000000..744b0166f6 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/declarers/KafkaProviderDeclarer.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.declarers; + +import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher; +import com.huaweicloud.sermant.core.plugin.agent.matcher.MethodMatcher; +import com.huaweicloud.sermant.tag.transmission.interceptors.KafkaProducerInterceptor; + +/** + * kafka生产消息增强拦截点声明,支持1.x, 2.x, 3.x + * + * @author lilai + * @since 2023-07-18 + */ +public class KafkaProviderDeclarer extends AbstractPluginDeclarer { + /** + * 增强类的全限定名 + */ + private static final String ENHANCE_CLASSES = "org.apache.kafka.clients.producer.KafkaProducer"; + + /** + * 拦截类的全限定名 + */ + private static final String INTERCEPT_CLASS = KafkaProducerInterceptor.class.getCanonicalName(); + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASSES); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("doSend"), INTERCEPT_CLASS) + }; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractClientInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractClientInterceptor.java new file mode 100644 index 0000000000..06fe101ef6 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractClientInterceptor.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.interceptors; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; +import com.huaweicloud.sermant.core.utils.MapUtils; +import com.huaweicloud.sermant.core.utils.tag.TrafficTag; +import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; +import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig; + +/** + * 客户端拦截器抽象类,获取当前线程的流量标签并透传至下游进程,适用于http客户端/rpc客户端/消息队列生产者 + * + * @author lilai + * @since 2023-07-18 + */ +public abstract class AbstractClientInterceptor extends AbstractInterceptor { + protected final TagTransmissionConfig tagTransmissionConfig; + + /** + * 构造器 + */ + public AbstractClientInterceptor() { + this.tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class); + } + + @Override + public ExecuteContext before(ExecuteContext context) { + if (!tagTransmissionConfig.isEnabled()) { + return context; + } + + TrafficTag trafficTag = TrafficUtils.getTrafficTag(); + if (trafficTag == null || MapUtils.isEmpty(trafficTag.getTag())) { + return context; + } + + return doBefore(context); + } + + @Override + public ExecuteContext after(ExecuteContext context) { + return doAfter(context); + } + + /** + * 前置触发点 + * + * @param context 执行上下文 + * @return 执行上下文 + */ + protected abstract ExecuteContext doBefore(ExecuteContext context); + + /** + * 后置触发点 + * + * @param context 执行上下文 + * @return 执行上下文 + */ + protected abstract ExecuteContext doAfter(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractServerInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractServerInterceptor.java new file mode 100644 index 0000000000..f034643214 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/AbstractServerInterceptor.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.interceptors; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; +import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; +import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig; + +/** + * 服务端拦截器抽象类,获取跨进程的流量标签并在本进程传递,适用于http服务端/rpc服务端/消息队列消费者 + * + * @author lilai + * @since 2023-07-18 + */ +public abstract class AbstractServerInterceptor extends AbstractInterceptor { + protected final TagTransmissionConfig tagTransmissionConfig; + + /** + * 构造器 + */ + public AbstractServerInterceptor() { + this.tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class); + } + + @Override + public ExecuteContext before(ExecuteContext context) { + if (!tagTransmissionConfig.isEnabled()) { + return context; + } + return doBefore(context); + } + + @Override + public ExecuteContext after(ExecuteContext context) { + return doAfter(context); + } + + /** + * 前置触发点 + * + * @param context 执行上下文 + * @return 执行上下文 + */ + protected abstract ExecuteContext doBefore(ExecuteContext context); + + /** + * 后置触发点 + * + * @param context 执行上下文 + * @return 执行上下文 + */ + protected abstract ExecuteContext doAfter(ExecuteContext context); +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/HttpClient4xInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/HttpClient4xInterceptor.java index ddc69e9081..2bfe1e56c1 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/HttpClient4xInterceptor.java +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/HttpClient4xInterceptor.java @@ -17,12 +17,8 @@ package com.huaweicloud.sermant.tag.transmission.interceptors; import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; -import com.huaweicloud.sermant.core.plugin.agent.interceptor.AbstractInterceptor; -import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; import com.huaweicloud.sermant.core.utils.CollectionUtils; -import com.huaweicloud.sermant.core.utils.tag.TrafficTag; import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; -import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig; import org.apache.http.HttpRequest; @@ -34,32 +30,14 @@ * @author lilai * @since 2023-07-17 */ -public class HttpClient4xInterceptor extends AbstractInterceptor { - private final TagTransmissionConfig tagTransmissionConfig; - - /** - * 构造器 - */ - public HttpClient4xInterceptor() { - tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class); - } - +public class HttpClient4xInterceptor extends AbstractClientInterceptor { @Override - public ExecuteContext before(ExecuteContext context) { - if (!tagTransmissionConfig.isEnabled()) { - return context; - } - - TrafficTag trafficTag = TrafficUtils.getTrafficTag(); - if (trafficTag == null) { - return context; - } - + public ExecuteContext doBefore(ExecuteContext context) { Object httpRequestObject = context.getArguments()[1]; if (httpRequestObject instanceof HttpRequest) { final HttpRequest httpRequest = (HttpRequest) httpRequestObject; for (String key : tagTransmissionConfig.getTagKeys()) { - List values = trafficTag.getTag().get(key); + List values = TrafficUtils.getTrafficTag().getTag().get(key); if (CollectionUtils.isEmpty(values)) { continue; } @@ -72,7 +50,7 @@ public ExecuteContext before(ExecuteContext context) { } @Override - public ExecuteContext after(ExecuteContext context) { + public ExecuteContext doAfter(ExecuteContext context) { return context; } } diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaConsumerRecordInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaConsumerRecordInterceptor.java new file mode 100644 index 0000000000..fc64cb2eb8 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaConsumerRecordInterceptor.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.interceptors; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; +import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; +import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * kafka消息处理的拦截器,在获取消息内容时获取流量标签并放置于线程变量中,支持1.x, 2.x, 3.x + * + * @author lilai + * @since 2023-07-18 + */ +public class KafkaConsumerRecordInterceptor extends AbstractServerInterceptor { + private final TagTransmissionConfig tagTransmissionConfig; + + /** + * 构造器 + */ + public KafkaConsumerRecordInterceptor() { + tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class); + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + Object consumerRecordObject = context.getObject(); + if (consumerRecordObject instanceof ConsumerRecord) { + final ConsumerRecord consumerRecord = (ConsumerRecord) consumerRecordObject; + Map> tagMap = extractTagMap(consumerRecord); + TrafficUtils.updateTrafficTag(tagMap); + } + return context; + } + + private Map> extractTagMap(ConsumerRecord consumerRecord) { + Map> headerMap = convertHeaders(consumerRecord); + List tagKeys = tagTransmissionConfig.getTagKeys(); + Map> tagMap = new HashMap<>(); + for (String key : tagKeys) { + tagMap.put(key, headerMap.get(key)); + } + return tagMap; + } + + private static Map> convertHeaders(ConsumerRecord consumerRecord) { + Map> headerMap = new HashMap<>(); + for (Header header : consumerRecord.headers()) { + String key = header.key(); + String value = new String(header.value(), StandardCharsets.UTF_8); + headerMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value); + } + return headerMap; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + return context; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaProducerInterceptor.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaProducerInterceptor.java new file mode 100644 index 0000000000..1595068a12 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/java/com/huaweicloud/sermant/tag/transmission/interceptors/KafkaProducerInterceptor.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.interceptors; + +import com.huaweicloud.sermant.core.plugin.agent.entity.ExecuteContext; +import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; +import com.huaweicloud.sermant.core.utils.CollectionUtils; +import com.huaweicloud.sermant.core.utils.tag.TrafficUtils; +import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * kafka生产者发送消息的拦截器,支持1.x, 2.x, 3.x + * + * @author lilai + * @since 2023-07-18 + */ +public class KafkaProducerInterceptor extends AbstractClientInterceptor { + private final TagTransmissionConfig tagTransmissionConfig; + + /** + * 构造器 + */ + public KafkaProducerInterceptor() { + tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class); + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + Object producerRecordObject = context.getArguments()[0]; + if (producerRecordObject instanceof ProducerRecord) { + final ProducerRecord producerRecord = (ProducerRecord) producerRecordObject; + Headers headers = producerRecord.headers(); + for (String key : tagTransmissionConfig.getTagKeys()) { + List values = TrafficUtils.getTrafficTag().getTag().get(key); + if (CollectionUtils.isEmpty(values)) { + continue; + } + for (String value : values) { + headers.add(key, value.getBytes(StandardCharsets.UTF_8)); + } + } + } + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + return context; + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer index d31f054de7..5701a5ae6b 100644 --- a/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-plugin/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -15,4 +15,6 @@ # # com.huaweicloud.sermant.tag.transmission.declarers.HttpClient4xDeclarer +com.huaweicloud.sermant.tag.transmission.declarers.KafkaConsumerRecordDeclarer +com.huaweicloud.sermant.tag.transmission.declarers.KafkaProviderDeclarer diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-service/pom.xml b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/pom.xml new file mode 100644 index 0000000000..93cce2529b --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/pom.xml @@ -0,0 +1,47 @@ + + + + sermant-tag-transmission + com.huaweicloud.sermant + 1.0.0 + + 4.0.0 + + tag-transmission-service + + + 8 + 8 + false + service + + + + + org.yaml + snakeyaml + + + com.huaweicloud.sermant + sermant-agentcore-core + provided + + + com.huaweicloud.sermant + tag-transmission-plugin + 1.0.0 + compile + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + \ No newline at end of file diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/listener/TagConfigListener.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/listener/TagConfigListener.java new file mode 100644 index 0000000000..dbfff1e496 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/listener/TagConfigListener.java @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.listener; + +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.config.PluginConfigManager; +import com.huaweicloud.sermant.core.service.dynamicconfig.common.DynamicConfigEvent; +import com.huaweicloud.sermant.core.service.dynamicconfig.common.DynamicConfigEventType; +import com.huaweicloud.sermant.core.service.dynamicconfig.common.DynamicConfigListener; +import com.huaweicloud.sermant.tag.transmission.config.TagTransmissionConfig; + +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.error.YAMLException; +import org.yaml.snakeyaml.representer.Representer; + +import java.util.Locale; +import java.util.logging.Logger; + +/** + * 流量标签动态配置监听器 + * + * @author lilai + * @since 2023-07-20 + */ +public class TagConfigListener implements DynamicConfigListener { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private TagTransmissionConfig tagTransmissionConfig; + + private final Yaml yaml; + + /** + * 构造方法 + */ + public TagConfigListener() { + Representer representer = new Representer(); + representer.getPropertyUtils().setSkipMissingProperties(true); + this.yaml = new Yaml(representer); + this.tagTransmissionConfig = PluginConfigManager.getPluginConfig(TagTransmissionConfig.class); + } + + @Override + public void process(DynamicConfigEvent event) { + if (event.getEventType() == DynamicConfigEventType.DELETE) { + tagTransmissionConfig.setEnabled(Boolean.FALSE); + return; + } + try { + updateConfig(event); + } catch (YAMLException e) { + LOGGER.severe(String.format(Locale.ROOT, "Fail to convert dynamic tag config, %s", e.getMessage())); + } + } + + private void updateConfig(DynamicConfigEvent event) { + TagTransmissionConfig dynamicConfig = yaml.loadAs(event.getContent(), TagTransmissionConfig.class); + if (dynamicConfig == null) { + return; + } + tagTransmissionConfig.setEnabled(dynamicConfig.isEnabled()); + tagTransmissionConfig.setTagKeys(dynamicConfig.getTagKeys()); + LOGGER.info(String.format(Locale.ROOT, "Update tagTransmissionConfig, %s", + tagTransmissionConfig.toString())); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/service/TagConfigService.java b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/service/TagConfigService.java new file mode 100644 index 0000000000..5b8aa45ec8 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/java/com/huaweicloud/sermant/tag/transmission/service/TagConfigService.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.tag.transmission.service; + +import com.huaweicloud.sermant.core.common.LoggerFactory; +import com.huaweicloud.sermant.core.plugin.service.PluginService; +import com.huaweicloud.sermant.core.service.ServiceManager; +import com.huaweicloud.sermant.core.service.dynamicconfig.DynamicConfigService; +import com.huaweicloud.sermant.tag.transmission.listener.TagConfigListener; + +import java.util.Locale; +import java.util.logging.Logger; + +/** + * 流量透传动态配置监听服务 + * + * @author lilai + * @since 2023-07-20 + */ +public class TagConfigService implements PluginService { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final String DYNAMIC_CONFIG_KEY = "tag-config"; + + private static final String DYNAMIC_CONFIG_GROUP = "sermant/tag-transmission-plugin"; + + @Override + public void start() { + DynamicConfigService dynamicConfigService = ServiceManager.getService(DynamicConfigService.class); + dynamicConfigService.addConfigListener(DYNAMIC_CONFIG_KEY, DYNAMIC_CONFIG_GROUP, + new TagConfigListener(), true); + LOGGER.info(String.format(Locale.ROOT, + "Success to subscribe %s/%s", DYNAMIC_CONFIG_GROUP, DYNAMIC_CONFIG_KEY)); + } +} diff --git a/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.service.PluginService b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.service.PluginService new file mode 100644 index 0000000000..702e2b1af9 --- /dev/null +++ b/sermant-plugins/sermant-tag-transmission/tag-transmission-service/src/main/resources/META-INF/services/com.huaweicloud.sermant.core.plugin.service.PluginService @@ -0,0 +1,17 @@ +# +# Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.huaweicloud.sermant.tag.transmission.service.TagConfigService \ No newline at end of file