Skip to content

Commit

Permalink
Dynamic thread pool console function changes (#447) (#453)
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks authored Aug 5, 2022
1 parent b34c84f commit f241b74
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ public class ConfigController {

@GetMapping
public Result<ConfigInfoBase> detailConfigInfo(
@RequestParam("tpId") String tpId,
@RequestParam("itemId") String itemId,
@RequestParam("namespace") String namespace,
@RequestParam(value = "instanceId", required = false) String instanceId) {
@RequestParam("tpId") String tpId,
@RequestParam("itemId") String itemId,
@RequestParam("namespace") String namespace,
@RequestParam(value = "instanceId", required = false) String instanceId) {
ConfigAllInfo configAllInfo = configService.findConfigRecentInfo(tpId, itemId, namespace, instanceId);
return Results.success(configAllInfo);
}

@PostMapping
public Result<Boolean> publishConfig(@RequestParam(value = "identify", required = false) String identify,
@RequestBody ConfigAllInfo config) {
configService.insertOrUpdate(identify, config);
configService.insertOrUpdate(identify, true, config);
return Results.success(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cn.hippo4j.common.design.observer.AbstractSubjectCenter;
import cn.hippo4j.common.design.observer.Observer;
import cn.hippo4j.common.design.observer.ObserverMessage;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.Md5Util;
import cn.hippo4j.config.event.LocalDataChangeEvent;
Expand All @@ -32,17 +33,19 @@
import cn.hippo4j.config.toolkit.MapUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION;

/**
* Config cache service.
*/
Expand Down Expand Up @@ -84,7 +87,7 @@ private synchronized static String getContentMd5IsNullPut(String groupKey, Strin
if (CONFIG_SERVICE == null) {
CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] params = groupKey.split("\\+");
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
if (config != null && StrUtil.isNotBlank(config.getTpId())) {
cacheItem = new CacheItem(groupKey, config);
Expand All @@ -98,7 +101,7 @@ public static String getContentMd5(String groupKey) {
if (CONFIG_SERVICE == null) {
CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] params = groupKey.split("\\+");
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
if (config == null || StringUtils.isEmpty(config.getTpId())) {
String errorMessage = String.format("config is null. tpId :: %s, itemId :: %s, tenantId :: %s", params[0], params[1], params[2]);
Expand All @@ -111,7 +114,7 @@ public static void updateMd5(String groupKey, String identify, String md5) {
CacheItem cache = makeSure(groupKey, identify);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
String[] params = groupKey.split("\\+");
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
cache.configAllInfo = config;
cache.lastModifiedTs = System.currentTimeMillis();
Expand Down Expand Up @@ -145,6 +148,22 @@ public static synchronized Integer getTotal() {
return total.get();
}

public static List<String> getIdentifyList(String tenantId, String itemId, String threadPoolId) {
List<String> identifyList = null;
String buildKey = Joiner.on(GROUP_KEY_DELIMITER).join(Lists.newArrayList(threadPoolId, itemId, tenantId));
List<String> keys = MapUtil.parseMapForFilter(CLIENT_CONFIG_CACHE, buildKey);
if (CollectionUtil.isNotEmpty(keys)) {
identifyList = new ArrayList(keys.size());
for (String each : keys) {
String[] keyArray = each.split(GROUP_KEY_DELIMITER_TRANSLATION);
if (keyArray != null && keyArray.length > 2) {
identifyList.add(keyArray[3]);
}
}
}
return identifyList;
}

/**
* Remove config cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public interface ConfigService {
* Insert or update.
*
* @param identify
* @param isChangeNotice
* @param configAllInfo
*/
void insertOrUpdate(String identify, ConfigAllInfo configAllInfo);

void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import cn.hippo4j.config.model.ConfigAllInfo;
import cn.hippo4j.config.model.ConfigInfoBase;
import cn.hippo4j.config.model.ConfigInstanceInfo;
import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.ConfigChangePublisher;
import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.config.toolkit.BeanUtil;
Expand Down Expand Up @@ -71,7 +72,6 @@ public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant
.eq(StrUtil.isNotBlank(tpId), ConfigAllInfo::getTpId, tpId)
.eq(StrUtil.isNotBlank(itemId), ConfigAllInfo::getItemId, itemId)
.eq(StrUtil.isNotBlank(tenantId), ConfigAllInfo::getTenantId, tenantId);

ConfigAllInfo configAllInfo = configInfoMapper.selectOne(wrapper);
return configAllInfo;
}
Expand All @@ -80,7 +80,6 @@ public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant
public ConfigAllInfo findConfigRecentInfo(String... params) {
ConfigAllInfo resultConfig;
ConfigAllInfo configInstance = null;

String instanceId = params[3];
if (StrUtil.isNotBlank(instanceId)) {
LambdaQueryWrapper<ConfigInstanceInfo> instanceQueryWrapper = Wrappers.lambdaQuery(ConfigInstanceInfo.class)
Expand All @@ -90,7 +89,6 @@ public ConfigAllInfo findConfigRecentInfo(String... params) {
.eq(ConfigInstanceInfo::getInstanceId, params[3])
.orderByDesc(ConfigInstanceInfo::getGmtCreate)
.last("LIMIT 1");

ConfigInstanceInfo instanceInfo = configInstanceMapper.selectOne(instanceQueryWrapper);
if (instanceInfo != null) {
String content = instanceInfo.getContent();
Expand All @@ -100,7 +98,6 @@ public ConfigAllInfo findConfigRecentInfo(String... params) {
configInstance.setMd5(Md5Util.getTpContentMd5(configInstance));
}
}

ConfigAllInfo configAllInfo = findConfigAllInfo(params[0], params[1], params[2]);
if (configAllInfo == null && configInstance == null) {
throw new ServiceException("Thread pool configuration is not defined.");
Expand All @@ -115,29 +112,27 @@ public ConfigAllInfo findConfigRecentInfo(String... params) {
resultConfig = configAllInfo;
}
}

return resultConfig;
}

@Override
public void insertOrUpdate(String identify, ConfigAllInfo configInfo) {
public void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configInfo) {
verification(identify);
LambdaQueryWrapper<ConfigAllInfo> queryWrapper = Wrappers.lambdaQuery(ConfigAllInfo.class)
.eq(ConfigAllInfo::getTenantId, configInfo.getTenantId())
.eq(ConfigInfoBase::getItemId, configInfo.getItemId())
.eq(ConfigInfoBase::getTpId, configInfo.getTpId());
ConfigAllInfo existConfig = configInfoMapper.selectOne(queryWrapper);

ConfigServiceImpl configService = ApplicationContextHolder.getBean(this.getClass());
configInfo.setCapacity(getQueueCapacityByType(configInfo));

ConditionUtil
.condition(
existConfig == null,
() -> configService.addConfigInfo(configInfo),
() -> configService.updateConfigInfo(identify, configInfo));

ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo)));
() -> configService.updateConfigInfo(identify, isChangeNotice, configInfo));
if (isChangeNotice) {
ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo)));
}
}

private void verification(String identify) {
Expand All @@ -150,7 +145,6 @@ private void verification(String identify) {
public Long addConfigInfo(ConfigAllInfo config) {
config.setContent(ContentUtil.getPoolContent(config));
config.setMd5(Md5Util.getTpContentMd5(config));

try {
// 当前为单体应用, 后续支持集群部署时切换分布式锁.
synchronized (ConfigService.class) {
Expand All @@ -159,7 +153,6 @@ public Long addConfigInfo(ConfigAllInfo config) {
.eq(ConfigAllInfo::getTpId, config.getTpId())
.eq(ConfigAllInfo::getDelFlag, DelEnum.NORMAL.getIntCode()));
Assert.isNull(configAllInfo, "线程池配置已存在.");

if (SqlHelper.retBool(configInfoMapper.insert(config))) {
return config.getId();
}
Expand All @@ -168,30 +161,36 @@ public Long addConfigInfo(ConfigAllInfo config) {
log.error("[db-error] message :: {}", ex.getMessage(), ex);
throw ex;
}

return null;
}

@LogRecord(bizNo = "{{#config.itemId}}_{{#config.tpId}}", category = "THREAD_POOL_UPDATE", success = "核心线程: {{#config.coreSize}}, 最大线程: {{#config.maxSize}}, 队列类型: {{#config.queueType}}, 队列容量: {{#config.capacity}}, 拒绝策略: {{#config.rejectedType}}", detail = "{{#config.toString()}}")
public void updateConfigInfo(String identify, ConfigAllInfo config) {
public void updateConfigInfo(String identify, boolean isChangeNotice, ConfigAllInfo config) {
LambdaUpdateWrapper<ConfigAllInfo> wrapper = Wrappers.lambdaUpdate(ConfigAllInfo.class)
.eq(ConfigAllInfo::getTpId, config.getTpId())
.eq(ConfigAllInfo::getItemId, config.getItemId())
.eq(ConfigAllInfo::getTenantId, config.getTenantId());

config.setGmtCreate(null);
config.setContent(ContentUtil.getPoolContent(config));
config.setMd5(Md5Util.getTpContentMd5(config));

try {
// 创建线程池配置实例临时配置, 也可以当作历史配置, 不过针对的是单节点
if (StrUtil.isNotBlank(identify)) {
if (StringUtil.isNotBlank(identify)) {
ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class);
instanceInfo.setInstanceId(identify);
configInstanceMapper.insert(instanceInfo);
return;
} else if (StringUtil.isEmpty(identify) && isChangeNotice) {
List<String> identifyList = ConfigCacheService.getIdentifyList(config.getTenantId(), config.getItemId(), config.getTpId());
if (CollectionUtil.isNotEmpty(identifyList)) {
for (String each : identifyList) {
ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class);
instanceInfo.setInstanceId(each);
configInstanceMapper.insert(instanceInfo);
}
}
return;
}

configInfoMapper.update(config, wrapper);
} catch (Exception ex) {
log.error("[db-error] message :: {}", ex.getMessage(), ex);
Expand All @@ -217,14 +216,11 @@ private Integer getQueueCapacityByType(ConfigAllInfo config) {
queueCapacity = config.getCapacity();
break;
}

List<Integer> queueTypes = Stream.of(1, 2, 3, 6, 9).collect(Collectors.toList());
boolean setDefaultFlag = queueTypes.contains(config.getQueueType()) && (config.getCapacity() == null || Objects.equals(config.getCapacity(), 0));
if (setDefaultFlag) {
queueCapacity = 1024;
}

return queueCapacity;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public List<ThreadPoolRespDTO> getThreadPoolByItemId(String itemId) {

@Override
public void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO) {
configService.insertOrUpdate(identify, BeanUtil.convert(reqDTO, ConfigAllInfo.class));
configService.insertOrUpdate(identify, false, BeanUtil.convert(reqDTO, ConfigAllInfo.class));
}

@LogRecord(bizNo = "{{#reqDTO.itemId}}_{{#reqDTO.tpId}}", category = "THREAD_POOL_DELETE", success = "删除线程池: {{#reqDTO.tpId}}", detail = "{{#reqDTO.toString()}}")
Expand All @@ -98,7 +98,6 @@ public void alarmEnable(String id, Integer isAlarm) {
ConfigAllInfo configAllInfo = configInfoMapper.selectById(id);
configAllInfo.setIsAlarm(isAlarm);
// TODO: 是否报警变更, 虽然通知了客户端, 但是并没有在客户端实时生效, 需要考虑一个好的场景思路
configService.insertOrUpdate(null, configAllInfo);
configService.insertOrUpdate(null, false, configAllInfo);
}

}

0 comments on commit f241b74

Please sign in to comment.