iterator = Splitter.on(File.separatorChar).split(path).iterator();
+ boolean found = false;
+ File dir = dstDir;
+ while (iterator.hasNext()) {
+ String current = iterator.next();
+ if (!found && flag.equals(current)) {
+ found = true;
+ if (into) {
+ dir = new File(dir, flag);
+ if (!dir.exists()) {
+ Assert.assertTrue(dir.mkdirs());
+ }
+ }
+ continue;
+ }
+
+ if (found) {
+ if (!iterator.hasNext()) {
+ dir = new File(dir, current);
+ } else {
+ dir = new File(dir, current);
+ if (!dir.exists()) {
+ Assert.assertTrue(dir.mkdir());
+ }
+ }
+ }
+ }
+
+ Assert.assertTrue(dir.createNewFile());
+ byte[] buffer = new byte[4096];
+ BufferedInputStream bis = new BufferedInputStream(src);
+ int len = 0;
+ try (BufferedOutputStream bos = new BufferedOutputStream(Files.newOutputStream(dir.toPath()))) {
+ while ((len = bis.read(buffer)) > 0) {
+ bos.write(buffer, 0, len);
+ }
+ }
+ }
+
+ public static void recursiveDelete(File file) {
+ if (file.isFile()) {
+ file.delete();
+ } else {
+ File[] files = file.listFiles();
+ if (null != files) {
+ for (File f : files) {
+ recursiveDelete(f);
+ }
+ }
+ file.delete();
+ }
+ }
+
+ public static File copyResources(String folder) throws IOException {
+ return copyResources(folder, false);
+ }
+
+ public static File copyResources(String folder, boolean into) throws IOException {
+ File home = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString().replace('-', '_'));
+ if (!home.exists()) {
+ Assert.assertTrue(home.mkdirs());
+ }
+ PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(AclTestHelper.class.getClassLoader());
+ Resource[] resources = resolver.getResources(String.format("classpath:%s/**/*", folder));
+ for (Resource resource : resources) {
+ if (!resource.isReadable()) {
+ continue;
+ }
+ String description = resource.getDescription();
+ int start = description.indexOf('[');
+ int end = description.lastIndexOf(']');
+ String path = description.substring(start + 1, end);
+ try (InputStream inputStream = resource.getInputStream()) {
+ copyTo(path, inputStream, home, folder, into);
+ }
+ }
+ return home;
+ }
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
new file mode 100644
index 00000000000..5193457146d
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl.plain;
+
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclConstants;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * In this class, we'll test the following scenarios, each containing several consecutive operations on ACL,
+ *
like updating and deleting ACL, changing config files and checking validations.
+ *
Case 1: Only conf/plain_acl.yml exists;
+ *
Case 2: Only conf/acl/plain_acl.yml exists;
+ *
Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists.
+ */
+public class PlainAccessControlFlowTest {
+ public static final String DEFAULT_TOPIC = "topic-acl";
+
+ public static final String DEFAULT_GROUP = "GID_acl";
+
+ public static final String DEFAULT_PRODUCER_AK = "ak11111";
+ public static final String DEFAULT_PRODUCER_SK = "1234567";
+
+ public static final String DEFAULT_CONSUMER_SK = "7654321";
+ public static final String DEFAULT_CONSUMER_AK = "ak22222";
+
+ public static final String DEFAULT_GLOBAL_WHITE_ADDR = "172.16.123.123";
+ public static final List DEFAULT_GLOBAL_WHITE_ADDRS_LIST = Collections.singletonList(DEFAULT_GLOBAL_WHITE_ADDR);
+
+ @Test
+ public void testEmptyAclFolderCase() throws NoSuchFieldException, IllegalAccessException,
+ IOException {
+ String folder = "empty_acl_folder_conf";
+ File home = AclTestHelper.copyResources(folder);
+ System.setProperty("rocketmq.home.dir", home.getAbsolutePath());
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ checkDefaultAclFileExists();
+ testValidationAfterConsecutiveUpdates(plainAccessValidator);
+ testValidationAfterConfigFileChanged(plainAccessValidator);
+ AclTestHelper.recursiveDelete(home);
+ }
+
+ @Test
+ public void testOnlyAclFolderCase() throws NoSuchFieldException, IllegalAccessException, IOException {
+ String folder = "only_acl_folder_conf";
+ File home = AclTestHelper.copyResources(folder);
+ System.setProperty("rocketmq.home.dir", home.getAbsolutePath());
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ checkDefaultAclFileExists();
+ testValidationAfterConsecutiveUpdates(plainAccessValidator);
+ testValidationAfterConfigFileChanged(plainAccessValidator);
+ AclTestHelper.recursiveDelete(home);
+ }
+
+ @Test
+ public void testBothAclFileAndFolderCase() throws NoSuchFieldException, IllegalAccessException,
+ IOException {
+ String folder = "both_acl_file_folder_conf";
+ File root = AclTestHelper.copyResources(folder);
+ System.setProperty("rocketmq.home.dir", root.getAbsolutePath());
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ checkDefaultAclFileExists();
+ testValidationAfterConsecutiveUpdates(plainAccessValidator);
+ testValidationAfterConfigFileChanged(plainAccessValidator);
+ AclTestHelper.recursiveDelete(root);
+ }
+
+ private void testValidationAfterConfigFileChanged(
+ PlainAccessValidator plainAccessValidator) throws NoSuchFieldException, IllegalAccessException {
+ PlainAccessConfig producerAccessConfig = generateProducerAccessConfig();
+ PlainAccessConfig consumerAccessConfig = generateConsumerAccessConfig();
+ List plainAccessConfigList = new LinkedList<>();
+ plainAccessConfigList.add(producerAccessConfig);
+ plainAccessConfigList.add(consumerAccessConfig);
+ PlainAccessData ymlMap = new PlainAccessData();
+ ymlMap.setAccounts(plainAccessConfigList);
+
+ // write prepared PlainAccessConfigs to file
+ final String aclConfigFile = System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml";
+ AclUtils.writeDataObject(aclConfigFile, ymlMap);
+
+ loadConfigFile(plainAccessValidator, aclConfigFile);
+
+ // check if added successfully
+ final AclConfig allAclConfig = plainAccessValidator.getAllAclConfig();
+ final List plainAccessConfigs = allAclConfig.getPlainAccessConfigs();
+ checkPlainAccessConfig(producerAccessConfig, plainAccessConfigs);
+ checkPlainAccessConfig(consumerAccessConfig, plainAccessConfigs);
+
+ //delete consumer account
+ plainAccessConfigList.remove(consumerAccessConfig);
+ AclUtils.writeDataObject(aclConfigFile, ymlMap);
+
+ loadConfigFile(plainAccessValidator, aclConfigFile);
+
+ // sending messages will be successful using prepared credentials
+ SessionCredentials producerCredential = new SessionCredentials(DEFAULT_PRODUCER_AK, DEFAULT_PRODUCER_SK);
+ AclClientRPCHook producerHook = new AclClientRPCHook(producerCredential);
+ validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
+ validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
+
+ // consuming messages will be failed for account has been deleted
+ SessionCredentials consumerCredential = new SessionCredentials(DEFAULT_CONSUMER_AK, DEFAULT_CONSUMER_SK);
+ AclClientRPCHook consumerHook = new AclClientRPCHook(consumerCredential);
+ boolean isConsumeFailed = false;
+ try {
+ validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, consumerHook, "", plainAccessValidator);
+ } catch (AclException e) {
+ isConsumeFailed = true;
+ }
+ Assert.assertTrue("Message should not be consumed after account deleted", isConsumeFailed);
+
+ }
+
+ private void testValidationAfterConsecutiveUpdates(
+ PlainAccessValidator plainAccessValidator) throws NoSuchFieldException, IllegalAccessException {
+ PlainAccessConfig producerAccessConfig = generateProducerAccessConfig();
+ plainAccessValidator.updateAccessConfig(producerAccessConfig);
+
+ PlainAccessConfig consumerAccessConfig = generateConsumerAccessConfig();
+ plainAccessValidator.updateAccessConfig(consumerAccessConfig);
+
+ plainAccessValidator.updateGlobalWhiteAddrsConfig(DEFAULT_GLOBAL_WHITE_ADDRS_LIST, null);
+
+ // check if the above config updated successfully
+ final AclConfig allAclConfig = plainAccessValidator.getAllAclConfig();
+ final List plainAccessConfigs = allAclConfig.getPlainAccessConfigs();
+ checkPlainAccessConfig(producerAccessConfig, plainAccessConfigs);
+ checkPlainAccessConfig(consumerAccessConfig, plainAccessConfigs);
+
+ Assert.assertEquals(DEFAULT_GLOBAL_WHITE_ADDRS_LIST, allAclConfig.getGlobalWhiteAddrs());
+
+ // check sending and consuming messages
+ SessionCredentials producerCredential = new SessionCredentials(DEFAULT_PRODUCER_AK, DEFAULT_PRODUCER_SK);
+ AclClientRPCHook producerHook = new AclClientRPCHook(producerCredential);
+ validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
+ validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
+
+ SessionCredentials consumerCredential = new SessionCredentials(DEFAULT_CONSUMER_AK, DEFAULT_CONSUMER_SK);
+ AclClientRPCHook consumerHook = new AclClientRPCHook(consumerCredential);
+ validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, consumerHook, "", plainAccessValidator);
+
+ // load from file
+ loadConfigFile(plainAccessValidator,
+ System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml");
+ SessionCredentials unmatchedCredential = new SessionCredentials("non_exists_sk", "non_exists_sk");
+ AclClientRPCHook dummyHook = new AclClientRPCHook(unmatchedCredential);
+ validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, dummyHook, DEFAULT_GLOBAL_WHITE_ADDR, plainAccessValidator);
+ validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, dummyHook, DEFAULT_GLOBAL_WHITE_ADDR, plainAccessValidator);
+ validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, dummyHook, DEFAULT_GLOBAL_WHITE_ADDR, plainAccessValidator);
+
+ //recheck after reloading
+ validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
+ validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
+ validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, consumerHook, "", plainAccessValidator);
+
+ }
+
+ private void loadConfigFile(PlainAccessValidator plainAccessValidator,
+ String configFileName) throws NoSuchFieldException, IllegalAccessException {
+ Class clazz = PlainAccessValidator.class;
+ Field f = clazz.getDeclaredField("aclPlugEngine");
+ f.setAccessible(true);
+ PlainPermissionManager aclPlugEngine = (PlainPermissionManager) f.get(plainAccessValidator);
+ aclPlugEngine.load(configFileName);
+ }
+
+ private PlainAccessConfig generateConsumerAccessConfig() {
+ PlainAccessConfig plainAccessConfig2 = new PlainAccessConfig();
+ plainAccessConfig2.setAccessKey(DEFAULT_CONSUMER_AK);
+ plainAccessConfig2.setSecretKey(DEFAULT_CONSUMER_SK);
+ plainAccessConfig2.setAdmin(false);
+ plainAccessConfig2.setDefaultTopicPerm(AclConstants.DENY);
+ plainAccessConfig2.setDefaultGroupPerm(AclConstants.DENY);
+ plainAccessConfig2.setTopicPerms(Collections.singletonList(DEFAULT_TOPIC + "=" + AclConstants.SUB));
+ plainAccessConfig2.setGroupPerms(Collections.singletonList(DEFAULT_GROUP + "=" + AclConstants.SUB));
+ return plainAccessConfig2;
+ }
+
+ private PlainAccessConfig generateProducerAccessConfig() {
+ PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+ plainAccessConfig.setAccessKey(DEFAULT_PRODUCER_AK);
+ plainAccessConfig.setSecretKey(DEFAULT_PRODUCER_SK);
+ plainAccessConfig.setAdmin(false);
+ plainAccessConfig.setDefaultTopicPerm(AclConstants.DENY);
+ plainAccessConfig.setDefaultGroupPerm(AclConstants.DENY);
+ plainAccessConfig.setTopicPerms(Collections.singletonList(DEFAULT_TOPIC + "=" + AclConstants.PUB));
+ return plainAccessConfig;
+ }
+
+ public void validatePullMessage(String topic,
+ String group,
+ AclClientRPCHook aclClientRPCHook,
+ String remoteAddr,
+ PlainAccessValidator plainAccessValidator) {
+ PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
+ pullMessageRequestHeader.setTopic(topic);
+ pullMessageRequestHeader.setConsumerGroup(group);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,
+ pullMessageRequestHeader);
+ aclClientRPCHook.doBeforeRequest(remoteAddr, remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
+ RemotingCommand.decode(buf), remoteAddr);
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw RemotingCommandException");
+ }
+ }
+
+ public void validateSendMessage(int requestCode,
+ String topic,
+ AclClientRPCHook aclClientRPCHook,
+ String remoteAddr,
+ PlainAccessValidator plainAccessValidator) {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic(topic);
+ RemotingCommand remotingCommand;
+ if (RequestCode.SEND_MESSAGE == requestCode) {
+ remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
+ } else {
+ remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2,
+ SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
+ }
+
+ aclClientRPCHook.doBeforeRequest(remoteAddr, remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
+ RemotingCommand.decode(buf), remoteAddr);
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+ Assert.fail("Should not throw RemotingCommandException");
+ }
+ }
+
+ private void checkPlainAccessConfig(final PlainAccessConfig plainAccessConfig,
+ final List plainAccessConfigs) {
+ for (PlainAccessConfig config : plainAccessConfigs) {
+ if (config.getAccessKey().equals(plainAccessConfig.getAccessKey())) {
+ Assert.assertEquals(plainAccessConfig.getSecretKey(), config.getSecretKey());
+ Assert.assertEquals(plainAccessConfig.isAdmin(), config.isAdmin());
+ Assert.assertEquals(plainAccessConfig.getDefaultGroupPerm(), config.getDefaultGroupPerm());
+ Assert.assertEquals(plainAccessConfig.getDefaultGroupPerm(), config.getDefaultGroupPerm());
+ Assert.assertEquals(plainAccessConfig.getWhiteRemoteAddress(), config.getWhiteRemoteAddress());
+ if (null != plainAccessConfig.getTopicPerms()) {
+ Assert.assertNotNull(config.getTopicPerms());
+ Assert.assertTrue(config.getTopicPerms().containsAll(plainAccessConfig.getTopicPerms()));
+ }
+ if (null != plainAccessConfig.getGroupPerms()) {
+ Assert.assertNotNull(config.getGroupPerms());
+ Assert.assertTrue(config.getGroupPerms().containsAll(plainAccessConfig.getGroupPerms()));
+ }
+ }
+ }
+ }
+
+ private void checkDefaultAclFileExists() {
+ boolean isExists = Files.exists(Paths.get(System.getProperty("rocketmq.home.dir")
+ + File.separator + "conf" + File.separator + "plain_acl.yml"));
+ Assert.assertTrue("default acl config file should exist", isExists);
+ }
+
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
index 62d98572053..ef0cffbdcc8 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.acl.plain;
+import com.google.common.base.Joiner;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,24 +35,25 @@
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.AclConfig;
-import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
-import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
-import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -61,11 +64,13 @@ public class PlainAccessValidatorTest {
private AclClientRPCHook aclClient;
private SessionCredentials sessionCredentials;
+ private File confHome;
@Before
- public void init() {
- File file = new File("src/test/resources");
- System.setProperty("rocketmq.home.dir", file.getAbsolutePath());
+ public void init() throws IOException {
+ String folder = "conf";
+ confHome = AclTestHelper.copyResources(folder, true);
+ System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath());
plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
@@ -74,6 +79,11 @@ public void init() {
aclClient = new AclClientRPCHook(sessionCredentials);
}
+ @After
+ public void cleanUp() {
+ AclTestHelper.recursiveDelete(confHome);
+ }
+
@Test
public void contentTest() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
@@ -110,7 +120,7 @@ public void validateTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -132,7 +142,28 @@ public void validateSendMessageTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
+ }
+
+ @Test
+ public void validateSendMessageToRetryTopicTest() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic(MixAll.getRetryTopic("groupB"));
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -144,7 +175,7 @@ public void validateSendMessageTest() {
@Test
public void validateSendMessageV2Test() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic("topicC");
+ messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
aclClient.doBeforeRequest("", remotingCommand);
@@ -153,7 +184,28 @@ public void validateSendMessageV2Test() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
+ plainAccessValidator.validate(accessResource);
+ } catch (RemotingCommandException e) {
+ e.printStackTrace();
+
+ Assert.fail("Should not throw IOException");
+ }
+ }
+
+ @Test
+ public void validateSendMessageV2ToRetryTopicTest() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic(MixAll.getRetryTopic("groupC"));
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
+ aclClient.doBeforeRequest("", remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ try {
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6:9876");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -182,7 +234,7 @@ public void validateForAdminCommandWithOutAclRPCHook() {
public void validatePullMessageTest() {
PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicC");
- pullMessageRequestHeader.setConsumerGroup("consumerGroupA");
+ pullMessageRequestHeader.setConsumerGroup("groupC");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
@@ -190,7 +242,7 @@ public void validatePullMessageTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -203,7 +255,7 @@ public void validatePullMessageTest() {
public void validateConsumeMessageBackTest() {
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = new ConsumerSendMsgBackRequestHeader();
consumerSendMsgBackRequestHeader.setOriginTopic("topicC");
- consumerSendMsgBackRequestHeader.setGroup("consumerGroupA");
+ consumerSendMsgBackRequestHeader.setGroup("groupC");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, consumerSendMsgBackRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
@@ -211,7 +263,7 @@ public void validateConsumeMessageBackTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -231,7 +283,7 @@ public void validateQueryMessageTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -268,9 +320,9 @@ public void validateHeartBeatTest() {
Set consumerDataSet = new HashSet<>();
Set subscriptionDataSet = new HashSet<>();
ProducerData producerData = new ProducerData();
- producerData.setGroupName("producerGroupA");
+ producerData.setGroupName("groupB");
ConsumerData consumerData = new ConsumerData();
- consumerData.setGroupName("consumerGroupA");
+ consumerData.setGroupName("groupC");
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic("topicC");
producerDataSet.add(producerData);
@@ -287,7 +339,7 @@ public void validateHeartBeatTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -299,7 +351,7 @@ public void validateHeartBeatTest() {
@Test
public void validateUnRegisterClientTest() {
UnregisterClientRequestHeader unregisterClientRequestHeader = new UnregisterClientRequestHeader();
- unregisterClientRequestHeader.setConsumerGroup("consumerGroupA");
+ unregisterClientRequestHeader.setConsumerGroup("groupB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, unregisterClientRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
@@ -307,7 +359,7 @@ public void validateUnRegisterClientTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -319,7 +371,7 @@ public void validateUnRegisterClientTest() {
@Test
public void validateGetConsumerListByGroupTest() {
GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = new GetConsumerListByGroupRequestHeader();
- getConsumerListByGroupRequestHeader.setConsumerGroup("consumerGroupA");
+ getConsumerListByGroupRequestHeader.setConsumerGroup("groupB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, getConsumerListByGroupRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
@@ -327,7 +379,7 @@ public void validateGetConsumerListByGroupTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -339,7 +391,7 @@ public void validateGetConsumerListByGroupTest() {
@Test
public void validateUpdateConsumerOffSetTest() {
UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader();
- updateConsumerOffsetRequestHeader.setConsumerGroup("consumerGroupA");
+ updateConsumerOffsetRequestHeader.setConsumerGroup("groupB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, updateConsumerOffsetRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
@@ -347,7 +399,7 @@ public void validateUpdateConsumerOffSetTest() {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
@@ -427,8 +479,12 @@ public void validateGetAllTopicConfigTest() {
@Test
public void addAccessAclYamlConfigTest() throws InterruptedException {
- String targetFileName = System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml";
- Map backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ String backupFileName = System.getProperty("rocketmq.home.dir")
+ + File.separator + "conf/plain_acl_bak.yml".replace("/", File.separator);
+ String targetFileName = System.getProperty("rocketmq.home.dir")
+ + File.separator + "conf/plain_acl.yml".replace("/", File.separator);
+ PlainAccessData backUpAclConfigMap = AclUtils.getYamlDataObject(backupFileName, PlainAccessData.class);
+ AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey("rocketmq3");
@@ -436,11 +492,11 @@ public void addAccessAclYamlConfigTest() throws InterruptedException {
plainAccessConfig.setWhiteRemoteAddress("192.168.0.*");
plainAccessConfig.setDefaultGroupPerm("PUB");
plainAccessConfig.setDefaultTopicPerm("SUB");
- List topicPerms = new ArrayList();
+ List topicPerms = new ArrayList<>();
topicPerms.add("topicC=PUB|SUB");
topicPerms.add("topicB=PUB");
plainAccessConfig.setTopicPerms(topicPerms);
- List groupPerms = new ArrayList();
+ List groupPerms = new ArrayList<>();
groupPerms.add("groupB=PUB|SUB");
groupPerms.add("groupC=DENY");
plainAccessConfig.setGroupPerms(groupPerms);
@@ -473,9 +529,9 @@ public void addAccessAclYamlConfigTest() throws InterruptedException {
Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(), 2);
String aclFileName = System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml";
- Map readableMap = AclUtils.getYamlDataObject(aclFileName, Map.class);
- List