Skip to content

Commit

Permalink
Merge pull request #1394 from lilai23/mq_prohibition
Browse files Browse the repository at this point in the history
消息队列禁消费插件: kafka以及配置相关UT
  • Loading branch information
Sherlockhan authored Dec 29, 2023
2 parents e580df6 + 0f31207 commit 9d1f090
Show file tree
Hide file tree
Showing 20 changed files with 1,175 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ServiceMeta implements BaseConfig {
/**
* 区域
*/
private String zone;
private String zone = DEFAULT;

/**
* 命名空间
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static ProhibitionConfig getGlobalConfig() {
*
* @return 局部配置
*/
public ProhibitionConfig getLocalConfig() {
public static ProhibitionConfig getLocalConfig() {
return localConfig;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.config;

import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.HashSet;

/**
* ProhibitionConfigManager单元测试
*
* @author lilai
* @since 2023-12-23
*/
public class ProhibitionConfigManagerTest {
private static ProhibitionConfig globalConfig;

private static ProhibitionConfig localConfig;

@BeforeClass
public static void setUp() {
globalConfig = new ProhibitionConfig();
HashSet<String> globalKafkaTopics = new HashSet<>();
globalKafkaTopics.add("testKafkaTopic-1");
HashSet<String> globalRocketMqTopics = new HashSet<>();
globalRocketMqTopics.add("testRocketMqTopic-1");
globalConfig.setKafkaTopics(globalKafkaTopics);
globalConfig.setRocketMqTopics(globalRocketMqTopics);
localConfig = new ProhibitionConfig();
HashSet<String> localKafkaTopics = new HashSet<>();
localKafkaTopics.add("testKafkaTopic-2");
HashSet<String> localRocketMqTopics = new HashSet<>();
localRocketMqTopics.add("testRocketMqTopic-2");
localConfig.setKafkaTopics(localKafkaTopics);
localConfig.setRocketMqTopics(localRocketMqTopics);
}

/**
* 测试Global和Local配置都开启的情况
*/
@Test
public void testGetKafkaProhibitionTopicsWithGlobalAndLocalConfigEnabled() {
globalConfig.setEnableKafkaProhibition(true);
localConfig.setEnableKafkaProhibition(true);
globalConfig.setEnableRocketMqProhibition(true);
localConfig.setEnableRocketMqProhibition(true);
ProhibitionConfigManager.updateGlobalConfig(globalConfig);
ProhibitionConfigManager.updateLocalConfig(localConfig);

Assert.assertEquals(globalConfig.getKafkaTopics(), ProhibitionConfigManager.getKafkaProhibitionTopics());
Assert.assertEquals(globalConfig.getRocketMqTopics(), ProhibitionConfigManager.getRocketMqProhibitionTopics());
}

/**
* 测试Global配置开启的情况
*/
@Test
public void testGetKafkaProhibitionTopicsWithJustGlobalConfigEnabled() {
globalConfig.setEnableKafkaProhibition(true);
localConfig.setEnableKafkaProhibition(false);
globalConfig.setEnableRocketMqProhibition(true);
localConfig.setEnableRocketMqProhibition(false);
ProhibitionConfigManager.updateGlobalConfig(globalConfig);
ProhibitionConfigManager.updateLocalConfig(localConfig);

Assert.assertEquals(globalConfig.getKafkaTopics(), ProhibitionConfigManager.getKafkaProhibitionTopics());
Assert.assertEquals(globalConfig.getRocketMqTopics(), ProhibitionConfigManager.getRocketMqProhibitionTopics());
}

/**
* 测试Local配置开启的情况
*/
@Test
public void testGetKafkaProhibitionTopicsWithJustLocalConfigEnabled() {
globalConfig.setEnableKafkaProhibition(false);
localConfig.setEnableKafkaProhibition(true);
globalConfig.setEnableRocketMqProhibition(false);
localConfig.setEnableRocketMqProhibition(true);
ProhibitionConfigManager.updateGlobalConfig(globalConfig);
ProhibitionConfigManager.updateLocalConfig(localConfig);

Assert.assertEquals(localConfig.getKafkaTopics(), ProhibitionConfigManager.getKafkaProhibitionTopics());
Assert.assertEquals(localConfig.getRocketMqTopics(), ProhibitionConfigManager.getRocketMqProhibitionTopics());
}

/**
* 测试Global和Local配置都关闭的情况
*/
@Test
public void testGetKafkaProhibitionTopicsWithBothConfigsDisabled() {
globalConfig.setEnableKafkaProhibition(false);
localConfig.setEnableKafkaProhibition(false);
globalConfig.setEnableRocketMqProhibition(false);
localConfig.setEnableRocketMqProhibition(false);
ProhibitionConfigManager.updateGlobalConfig(globalConfig);
ProhibitionConfigManager.updateLocalConfig(localConfig);

Assert.assertTrue(ProhibitionConfigManager.getKafkaProhibitionTopics().isEmpty());
Assert.assertTrue(ProhibitionConfigManager.getRocketMqProhibitionTopics().isEmpty());
}

/**
* 测试更新配置不为null的情况
*/
@Test
public void testUpdateConfigWithNonNullConfig() {
ProhibitionConfigManager.updateGlobalConfig(globalConfig);
ProhibitionConfigManager.updateLocalConfig(localConfig);
Assert.assertEquals(globalConfig, ProhibitionConfigManager.getGlobalConfig());
}

/**
* 测试更新配置为null的情况
*/
@Test
public void testUpdateConfigWithNullConfig() {
ProhibitionConfigManager.updateGlobalConfig(null);
ProhibitionConfigManager.updateLocalConfig(null);

Assert.assertEquals(0, ProhibitionConfigManager.getGlobalConfig().getKafkaTopics().size());
Assert.assertEquals(0, ProhibitionConfigManager.getGlobalConfig().getRocketMqTopics().size());
Assert.assertFalse(ProhibitionConfigManager.getGlobalConfig().isEnableKafkaProhibition());
Assert.assertFalse(ProhibitionConfigManager.getGlobalConfig().isEnableRocketMqProhibition());
Assert.assertEquals(0, ProhibitionConfigManager.getLocalConfig().getKafkaTopics().size());
Assert.assertEquals(0, ProhibitionConfigManager.getLocalConfig().getRocketMqTopics().size());
Assert.assertFalse(ProhibitionConfigManager.getLocalConfig().isEnableKafkaProhibition());
Assert.assertFalse(ProhibitionConfigManager.getLocalConfig().isEnableRocketMqProhibition());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (C) 2023-2023 Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.huaweicloud.sermant.kafka.cache;

import com.huaweicloud.sermant.core.config.ConfigManager;
import com.huaweicloud.sermant.core.plugin.config.ServiceMeta;
import com.huaweicloud.sermant.core.utils.NetworkUtils;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashSet;

/**
* KafkaConsumerCache单元测试
*
* @author lilai
* @since 2023-12-23
*/
public class KafkaConsumerCacheTest {
private MockedStatic<ConfigManager> configManagerMockedStatic;

@Before
public void setUp() {
configManagerMockedStatic = Mockito.mockStatic(ConfigManager.class);
configManagerMockedStatic.when(() -> ConfigManager.getConfig(ServiceMeta.class)).thenReturn(new ServiceMeta());
}

@After
public void tearDown() {
configManagerMockedStatic.close();
}

/**
* 测试addKafkaConsumer方法
*/
@Test
public void testAddKafkaConsumer() {
KafkaConsumer<?, ?> mockConsumer = Mockito.mock(KafkaConsumer.class);
int hashCode = mockConsumer.hashCode();
KafkaConsumerCache.INSTANCE.addKafkaConsumer(mockConsumer);

Assert.assertTrue(KafkaConsumerCache.INSTANCE.getCache().containsKey(hashCode));
Assert.assertEquals(mockConsumer, KafkaConsumerCache.INSTANCE.getCache().get(hashCode).getKafkaConsumer());
}

/**
* 测试convert方法
*/
@Test
public void testConvert() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
KafkaConsumer<?, ?> mockConsumer = Mockito.mock(KafkaConsumer.class);
Method method = KafkaConsumerCache.class.getDeclaredMethod("convert", KafkaConsumer.class);
method.setAccessible(true);
KafkaConsumerWrapper wrapper = (KafkaConsumerWrapper) method.invoke(KafkaConsumerCache.INSTANCE, mockConsumer);

Assert.assertNotNull(wrapper);
Assert.assertEquals(mockConsumer, wrapper.getKafkaConsumer());
Assert.assertEquals("default", wrapper.getApplication());
Assert.assertEquals("default", wrapper.getService());
Assert.assertEquals("default", wrapper.getZone());
Assert.assertEquals("default", wrapper.getProject());
Assert.assertEquals("", wrapper.getEnvironment());
Assert.assertFalse(wrapper.isAssign());
Assert.assertFalse(wrapper.getIsConfigChanged().get());
Assert.assertEquals(new HashSet<>(), wrapper.getOriginalTopics());
Assert.assertEquals(new HashSet<>(), wrapper.getOriginalPartitions());
Assert.assertEquals(NetworkUtils.getMachineIp(), wrapper.getServerAddress());
}
}
Loading

0 comments on commit 9d1f090

Please sign in to comment.