Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

流量标签透传特性:支持跨线程传递标签 #1268

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ public static MethodMatcher isMemberMethod() {
return methodTypeMatches(MethodType.MEMBER);
}

/**
* 匹配公有方法,见{@link #methodTypeMatches}
*
* @return 方法匹配器对象
*/
public static MethodMatcher isPublicMethod() {
return methodTypeMatches(MethodType.PUBLIC);
}

/**
* 匹配符合类型的方法,包括静态方法,构造函数和成员方法三种
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public boolean match(MethodDescription methodDescription) {
public boolean match(MethodDescription methodDescription) {
return !methodDescription.isStatic() && !methodDescription.isConstructor();
}
},
/**
* 公有方法
*/
PUBLIC() {
@Override
public boolean match(MethodDescription methodDescription) {
return methodDescription.isPublic();
}
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.core.utils.tag;

import java.util.List;
import java.util.Map;

/**
* 流量相关新信息
*
* @author lilai
* @since 2023-07-26
*/
public class TrafficData extends TrafficTag {
private final String path;

private final String httpMethod;

/**
* 构造方法
*
* @param header 请求头/attachments
* @param path 请求路径
* @param httpMethod 请求方法
*/
public TrafficData(Map<String, List<String>> header, String path, String httpMethod) {
super(header);
this.path = path;
this.httpMethod = httpMethod;
}

public String getPath() {
return path;
}

public String getHttpMethod() {
return httpMethod;
}

@Override
public String toString() {
return "{"
+ "path='" + path + '\''
+ ", httpMethod='" + httpMethod + '\''
+ ", tag='" + getTag() + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,59 @@
* @since 2023-07-17
*/
public class TrafficUtils {
private static final ThreadLocal<TrafficTag> TAG = new ThreadLocal<>();
private static ThreadLocal<TrafficTag> tag = new ThreadLocal<>();

private static ThreadLocal<TrafficData> data = new ThreadLocal<>();

private TrafficUtils() {
}

/**
* 如果开启在new Thread时跨线程传递标签,需要把ThreadLocal初始化为InheritableThreadLocal
*/
public static void setInheritableThreadLocal() {
if (!(tag instanceof InheritableThreadLocal)) {
tag = new InheritableThreadLocal<>();
}

if (!(data instanceof InheritableThreadLocal)) {
data = new InheritableThreadLocal<>();
}
}

/**
* 获取线程中的流量标签
*
* @return 流量标签
*/
public static TrafficTag getTrafficTag() {
return TAG.get();
return tag.get();
}

/**
* 获取线程中的流量信息
*
* @return 流量信息
*/
public static TrafficData getTrafficData() {
return data.get();
}

/**
* 更新线程中的流量标签
*
* @param tag 流量标签map
* @param tagMap 流量标签map
*/
public static void updateTrafficTag(Map<String, List<String>> tag) {
if (MapUtils.isEmpty(tag)) {
public static void updateTrafficTag(Map<String, List<String>> tagMap) {
if (MapUtils.isEmpty(tagMap)) {
return;
}
TrafficTag trafficTag = TAG.get();
TrafficTag trafficTag = TrafficUtils.tag.get();
if (trafficTag == null) {
TAG.set(new TrafficTag(tag));
TrafficUtils.tag.set(new TrafficTag(tagMap));
return;
}
trafficTag.updateTag(tag);
trafficTag.updateTag(tagMap);
}

/**
Expand All @@ -68,13 +92,29 @@ public static void setTrafficTag(TrafficTag trafficTag) {
if (trafficTag == null) {
return;
}
TAG.set(trafficTag);
tag.set(trafficTag);
}

/**
* 删除线程变量
*/
public static void removeTrafficTag() {
TAG.remove();
tag.remove();
}

/**
* 流量信息存入线程变量
*
* @param value 线程变量
*/
public static void setTrafficData(TrafficData value) {
data.set(value);
}

/**
* 删除流量信息
*/
public static void removeTrafficData() {
data.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void close() throws Exception {
public void doBefore() throws Exception {
REGISTER_CONFIG.setEnableSpringRegister(true);
REGISTER_CONFIG.setOpenMigration(true);
RegisterDynamicConfig.INSTANCE.setClose(false);
final ExecuteContext context = interceptor.before(buildContext());
Assert.assertFalse(context.isSkip());
RegisterDynamicConfig.INSTANCE.setClose(true);
Expand Down
16 changes: 14 additions & 2 deletions sermant-plugins/sermant-tag-transmission/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
tag.transmission.plugin:
# 流量标签在各种通道间(http/rpc/消息队列等)传递的配置
tag.transmission.config:
# 是否开启流量标签透传
enabled: true
tagKeys: [id,name]
# 需要透传的流量标签的key
tagKeys: [id,name]

# 跨线程传递标签的配置,该能力可单独使用
crossthread.config:
# 是否在直接new Thread时传递标签
enabled-thread: true
# 是否在非定时线程池中传递标签
enabled-thread-pool: true
# 是否在定时线程池的schedule/scheduleAtFixedRate/scheduleWithFixedDelay方法中传递标签
enabled-scheduler: true
lilai23 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.tag.transmission.config;

import com.huaweicloud.sermant.core.config.common.ConfigFieldKey;
import com.huaweicloud.sermant.core.config.common.ConfigTypeKey;
import com.huaweicloud.sermant.core.plugin.config.PluginConfig;

/**
* 跨线程传递开关配置
*
* @author lilai
* @since 2023-08-02
*/
@ConfigTypeKey("crossthread.config")
public class CrossThreadConfig implements PluginConfig {
/**
* 是否在非定时线程池中传递标签
*/
@ConfigFieldKey("enabled-thread-pool")
private boolean enabledThreadPool;

/**
* 是否在定时线程池的schedule/scheduleAtFixedRate/scheduleWithFixedDelay方法中传递标签
*/
@ConfigFieldKey("enabled-scheduler")
private boolean enabledScheduler;

/**
* 是否在直接new Thread时传递标签
*/
@ConfigFieldKey("enabled-thread")
private boolean enabledThread;

public boolean isEnabledThread() {
return enabledThread;
}

public void setEnabledThread(boolean enabledThread) {
this.enabledThread = enabledThread;
}

public boolean isEnabledThreadPool() {
return enabledThreadPool;
}

public void setEnabledThreadPool(boolean enabledThreadPool) {
this.enabledThreadPool = enabledThreadPool;
}

public boolean isEnabledScheduler() {
return enabledScheduler;
}

public void setEnabledScheduler(boolean enabledScheduler) {
this.enabledScheduler = enabledScheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.util.List;

/**
* 流量标签透传插件配置
* 流量标签透传配置
*
* @author lilai
* @since 2023-07-17
*/
@ConfigTypeKey("tag.transmission.plugin")
@ConfigTypeKey("tag.transmission.config")
public class TagTransmissionConfig implements PluginConfig {
/**
* 是否开启适配
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* @author lilai
* @since 2023-07-18
*/
public class KafkaProviderDeclarer extends AbstractPluginDeclarer {
public class KafkaProducerDeclarer extends AbstractPluginDeclarer {
/**
* 增强类的全限定名
*/
Expand Down
Loading