Skip to content

Commit

Permalink
add mq gray plugin declare/service
Browse files Browse the repository at this point in the history
Signed-off-by: chengyouling <[email protected]>
  • Loading branch information
chengyouling committed Sep 14, 2024
1 parent a61202c commit cd60332
Show file tree
Hide file tree
Showing 39 changed files with 1,861 additions and 346 deletions.
37 changes: 37 additions & 0 deletions sermant-plugins/sermant-mq-grayscale/mq-config-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>sermant-mq-grayscale</artifactId>
<groupId>io.sermant</groupId>
<version>1.0.0</version>
</parent>

<artifactId>mq-config-common</artifactId>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>io.sermant</groupId>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.config;

import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType;
import io.sermant.mq.grayscale.config.rocketmq.RocketMqConfigUtils;

/**
* grayscale config cache
*
* @author chengyouling
* @since 2024-09-12
**/
public class MqGrayConfigCache {
private static MqGrayscaleConfig cacheConfig = PluginConfigManager.getPluginConfig(MqGrayscaleConfig.class);

private MqGrayConfigCache() {
}

/**
* get cache mqGrayscaleConfig
*
* @return mqGrayscaleConfig
*/
public static MqGrayscaleConfig getCacheConfig() {
return cacheConfig;
}

/**
* set cache mqGrayscaleConfig
*
* @param config mqGrayscaleConfig
* @param eventType eventType
*/
public static void setCacheConfig(MqGrayscaleConfig config, DynamicConfigEventType eventType) {
if (eventType == DynamicConfigEventType.CREATE) {
cacheConfig = config;
RocketMqConfigUtils.updateChangeFlag();
RocketMqConfigUtils.recordTrafficTagsSet(config);
return;
}
boolean isAllowRefresh = isAllowRefreshChangeFlag(cacheConfig, config);
if (isAllowRefresh) {
cacheConfig.updateGrayscaleConfig(MqGrayConfigCache.getCacheConfig());
RocketMqConfigUtils.updateChangeFlag();
RocketMqConfigUtils.recordTrafficTagsSet(config);
}
}

/**
* clear cache mqGrayscaleConfig
*/
public static void clearCacheConfig() {
cacheConfig = new MqGrayscaleConfig();
RocketMqConfigUtils.updateChangeFlag();
}

/**
* only traffic label changes allow refresh tag change map to rebuild SQL92 query statement,
* because if the serviceMeta changed, the gray consumer cannot be matched and becomes a base consumer
* so, if you need to change the env tag, restart all services.
*
* @param resource cache config
* @param target cache config
* @return boolean
*/
private static boolean isAllowRefreshChangeFlag(MqGrayscaleConfig resource, MqGrayscaleConfig target) {
if (resource.isEnabled() != target.isEnabled()) {
return true;
}
if (resource.isBaseExcludeGroupTagsChanged(target)) {
return true;
}
if (resource.isConsumerModeChanged(target)) {
return true;
}
return !resource.buildAllTrafficTagInfoToStr().equals(target.buildAllTrafficTagInfoToStr());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.sermant.core.config.common.ConfigTypeKey;
import io.sermant.core.plugin.config.PluginConfig;
import io.sermant.mq.grayscale.utils.SubscriptionDataUtils;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -35,6 +34,11 @@
**/
@ConfigTypeKey("grayscale.mq.config")
public class MqGrayscaleConfig implements PluginConfig {
/**
* afa symbol
*/
private static final String AFA_SYMBOL = "@";

private boolean enabled = false;

private List<GrayTagItem> grayscale = new ArrayList<>();
Expand Down Expand Up @@ -122,12 +126,12 @@ public String buildAllTrafficTagInfoToStr() {
StringBuilder sb = new StringBuilder();
for (GrayTagItem item : grayscale) {
if (sb.length() > 0) {
sb.append(SubscriptionDataUtils.AFA_SYMBOL);
sb.append(AFA_SYMBOL);
}
sb.append(item.getConsumerGroupTag());
for (Map.Entry<String, String> entry : item.getTrafficTag().entrySet()) {
sb.append(entry.getKey())
.append(SubscriptionDataUtils.AFA_SYMBOL)
.append(AFA_SYMBOL)
.append(entry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.config.rocketmq;

import io.sermant.mq.grayscale.config.GrayTagItem;
import io.sermant.mq.grayscale.config.MqGrayscaleConfig;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* rocketmq config utils
*
* @author chengyouling
* @since 2024-09-13
*/
public class RocketMqConfigUtils {
/**
* base instance subscript gray tag change flags
* key: namesrvAddr@topic@consumerGroup
* value: change flag
*/
private static final Map<String, Boolean> BASE_GROUP_TAG_CHANGE_MAP = new ConcurrentHashMap<>();

/**
* gray instance subscript gray tag change flags
* key: namesrvAddr@topic@consumerGroup
* value: change flag
*/
private static final Map<String, Boolean> GRAY_GROUP_TAG_CHANGE_MAP = new ConcurrentHashMap<>();

/**
* all traffic tags that's been set, using for sql92 expression reset
*/
private static final Set<String> GRAY_TAGS_SET = new HashSet<>();

private RocketMqConfigUtils() {
}

/**
* set base consumer address@topic@group correspondents change flag
*
* @param subscribeScope subscribeScope
* @param flag flag
*/
public static void setBaseGroupTagChangeMap(String subscribeScope, boolean flag) {
BASE_GROUP_TAG_CHANGE_MAP.put(subscribeScope, flag);
}

/**
* set gray consumer address@topic@group correspondents change flag
*
* @param subscribeScope subscribeScope
* @param flag flag
*/
public static void setGrayGroupTagChangeMap(String subscribeScope, boolean flag) {
GRAY_GROUP_TAG_CHANGE_MAP.put(subscribeScope, flag);
}

/**
* get base consumer address@topic@group correspondents change flag
*
* @param subscribeScope subscribeScope
* @return changeFlag
*/
public static boolean getBaseGroupTagChangeMap(String subscribeScope) {
return BASE_GROUP_TAG_CHANGE_MAP.get(subscribeScope) != null && BASE_GROUP_TAG_CHANGE_MAP.get(subscribeScope);
}

/**
* get gray consumer address@topic@group correspondents change flag
*
* @param subscribeScope subscribeScope
* @return changeFlag
*/
public static boolean getGrayGroupTagChangeMap(String subscribeScope) {
return GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope) != null && GRAY_GROUP_TAG_CHANGE_MAP.get(subscribeScope);
}

/**
* update all consumer gray tag change flag
*/
public static void updateChangeFlag() {
BASE_GROUP_TAG_CHANGE_MAP.replaceAll((k, v) -> true);
GRAY_GROUP_TAG_CHANGE_MAP.replaceAll((k, v) -> true);
}

/**
* records traffic labels for historical and current configuration settings
*
* @param config config
*/
public static void recordTrafficTagsSet(MqGrayscaleConfig config) {
for (GrayTagItem item : config.getGrayscale()) {
if (!item.getTrafficTag().isEmpty()) {
GRAY_TAGS_SET.addAll(item.getTrafficTag().keySet());
}
}
}

/**
* get all config set gray tags
*
* @return all config set gray tags
*/
public static Set<String> getGrayTagsSet() {
return GRAY_TAGS_SET;
}
}
47 changes: 47 additions & 0 deletions sermant-plugins/sermant-mq-grayscale/mq-config-service/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>sermant-mq-grayscale</artifactId>
<groupId>io.sermant</groupId>
<version>1.0.0</version>
</parent>

<artifactId>mq-config-service</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<package.plugin.type>service</package.plugin.type>
</properties>

<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>io.sermant</groupId>
<artifactId>sermant-agentcore-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.sermant</groupId>
<artifactId>mq-config-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* limitations under the License.
*/

package io.sermant.mq.grayscale.config;
package io.sermant.mq.grayscale.listener;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType;
import io.sermant.core.utils.StringUtils;
import io.sermant.mq.grayscale.utils.MqGrayscaleConfigUtils;
import io.sermant.mq.grayscale.config.MqGrayConfigCache;
import io.sermant.mq.grayscale.config.MqGrayscaleConfig;

import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
Expand Down Expand Up @@ -64,15 +65,15 @@ public void handle(DynamicConfigEvent event) {
return;
}
if (event.getEventType() == DynamicConfigEventType.DELETE) {
MqGrayscaleConfigUtils.resetGrayscaleConfig();
MqGrayConfigCache.clearCacheConfig();
return;
}
if (!StringUtils.isEmpty(event.getContent())) {
LOGGER.info(String.format(Locale.ROOT, "mqGrayscale [%s] dynamicConfig context: %s",
event.getGroup(), event.getContent()));
MqGrayscaleConfig config = yaml.loadAs(event.getContent(), MqGrayscaleConfig.class);
if (config != null) {
MqGrayscaleConfigUtils.setGrayscaleConfig(config, event.getEventType());
MqGrayConfigCache.setCacheConfig(config, event.getEventType());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.sermant.mq.grayscale.config;
package io.sermant.mq.grayscale.listener;

import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigListener;
Expand Down
Loading

0 comments on commit cd60332

Please sign in to comment.