From 180a7dbd683c070d13aa443108986041f693acf9 Mon Sep 17 00:00:00 2001 From: wangshaojie4039 Date: Fri, 4 Jan 2019 17:37:10 +0800 Subject: [PATCH] [ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature. (#651) * [ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature. * [ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature. * [ISSUE#403]fix lock/unlock issue,and optimize the codes. --- acl/pom.xml | 4 + .../acl/plain/PlainPermissionLoader.java | 153 ++++++------------ .../acl/plain/PlainAccessValidatorTest.java | 37 +++++ .../acl/plain/PlainPermissionLoaderTest.java | 63 ++++---- .../resources/conf/watch/plain_acl_watch.yml | 34 ++-- 5 files changed, 150 insertions(+), 141 deletions(-) diff --git a/acl/pom.xml b/acl/pom.xml index aab09001ac8..4978ad5e9cf 100644 --- a/acl/pom.xml +++ b/acl/pom.xml @@ -37,6 +37,10 @@ ${project.groupId} rocketmq-common + + ${project.groupId} + rocketmq-srvutil + org.yaml snakeyaml diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java index 1b8269332f0..9a5fea73241 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java @@ -19,28 +19,20 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import java.io.File; -import java.io.IOException; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.StringUtils; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.Permission; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.srvutil.FileWatchService; public class PlainPermissionLoader { @@ -48,25 +40,31 @@ public class PlainPermissionLoader { private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml"; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE); - private Map plainAccessResourceMap = new HashMap<>(); + private Map plainAccessResourceMap = new HashMap<>(); - private List globalWhiteRemoteAddressStrategy = new ArrayList<>(); + private List globalWhiteRemoteAddressStrategy = new ArrayList<>(); private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory(); private boolean isWatchStart; public PlainPermissionLoader() { - initialize(); + load(); watch(); } - public void initialize() { + public void load() { + + Map plainAccessResourceMap = new HashMap<>(); + List globalWhiteRemoteAddressStrategy = new ArrayList<>(); + JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class); @@ -77,92 +75,42 @@ public void initialize() { JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { - addGlobalWhiteRemoteAddress(globalWhiteRemoteAddressesList.getString(i)); + globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. + getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); } } JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) { - List plainAccessList = accounts.toJavaList(PlainAccessConfig.class); - for (PlainAccessConfig plainAccess : plainAccessList) { - this.addPlainAccessResource(getPlainAccessResource(plainAccess)); + List plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); + for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { + PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); + plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); } } + + this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; + this.plainAccessResourceMap = plainAccessResourceMap; } private void watch() { - String version = System.getProperty("java.version"); - String[] str = StringUtils.split(version, "."); - if (Integer.valueOf(str[1]) < 7) { - log.warn("Watch need jdk equal or greater than 1.7, current version is {}", str[1]); - return; - } - try { - int fileIndex = fileName.lastIndexOf("/") + 1; - String watchDirectory = fileName.substring(0, fileIndex); - final String watchFileName = fileName.substring(fileIndex); - log.info("watch directory is {} , watch file name is {} ", fileHome + File.separator + watchDirectory, watchFileName); - - final WatchService watcher = FileSystems.getDefault().newWatchService(); - Path p = Paths.get(fileHome + File.separator + watchDirectory); - p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE); - ServiceThread watcherServcie = new ServiceThread() { - - public void run() { - while (true) { - try { - WatchKey watchKey = watcher.take(); - List> watchEvents = watchKey.pollEvents(); - for (WatchEvent event : watchEvents) { - if (watchFileName.equals(event.context().toString()) - && (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind()) - || StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) { - log.info("{} make a difference change is : {}", watchFileName, event.toString()); - //Clearing the info, may result in a non-available time - PlainPermissionLoader.this.clearPermissionInfo(); - initialize(); - } - } - watchKey.reset(); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - UtilAll.sleep(3000); - - } - } - } - + String watchFilePath = fileHome + fileName; + FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() { @Override - public String getServiceName() { - return "AclWatcherService"; + public void onChanged(String path) { + log.info("The plain acl yml changed, reload the context"); + load(); } - - }; - watcherServcie.start(); + }); + fileWatchService.start(); log.info("Succeed to start AclWatcherService"); this.isWatchStart = true; - } catch (IOException e) { + } catch (Exception e) { log.error("Failed to start AclWatcherService", e); } } - PlainAccessResource getPlainAccessResource(PlainAccessConfig plainAccess) { - PlainAccessResource plainAccessResource = new PlainAccessResource(); - plainAccessResource.setAccessKey(plainAccess.getAccessKey()); - plainAccessResource.setSecretKey(plainAccess.getSecretKey()); - plainAccessResource.setWhiteRemoteAddress(plainAccess.getWhiteRemoteAddress()); - - plainAccessResource.setAdmin(plainAccess.isAdmin()); - - plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccess.getDefaultGroupPerm())); - plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccess.getDefaultTopicPerm())); - - Permission.parseResourcePerms(plainAccessResource, false, plainAccess.getGroupPerms()); - Permission.parseResourcePerms(plainAccessResource, true, plainAccess.getTopicPerms()); - return plainAccessResource; - } - void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) { if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); @@ -200,31 +148,32 @@ void clearPermissionInfo() { this.globalWhiteRemoteAddressStrategy.clear(); } - public void addPlainAccessResource(PlainAccessResource plainAccessResource) throws AclException { - if (plainAccessResource.getAccessKey() == null - || plainAccessResource.getSecretKey() == null - || plainAccessResource.getAccessKey().length() <= 6 - || plainAccessResource.getSecretKey().length() <= 6) { + public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException { + if (plainAccessConfig.getAccessKey() == null + || plainAccessConfig.getSecretKey() == null + || plainAccessConfig.getAccessKey().length() <= 6 + || plainAccessConfig.getSecretKey().length() <= 6) { throw new AclException(String.format( "The accessKey=%s and secretKey=%s cannot be null and length should longer than 6", - plainAccessResource.getAccessKey(), plainAccessResource.getSecretKey())); + plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey())); } - try { - RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory - .getRemoteAddressStrategy(plainAccessResource); - plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategy); + PlainAccessResource plainAccessResource = new PlainAccessResource(); + plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey()); + plainAccessResource.setSecretKey(plainAccessConfig.getSecretKey()); + plainAccessResource.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress()); - if (plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { - log.warn("Duplicate acl config for {}, the newly one may overwrite the old", plainAccessResource.getAccessKey()); - } - plainAccessResourceMap.put(plainAccessResource.getAccessKey(), plainAccessResource); - } catch (Exception e) { - throw new AclException(String.format("Load plain access resource failed %s %s", e.getMessage(), plainAccessResource.toString()), e); - } - } + plainAccessResource.setAdmin(plainAccessConfig.isAdmin()); + + plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultGroupPerm())); + plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultTopicPerm())); - private void addGlobalWhiteRemoteAddress(String remoteAddresses) { - globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getRemoteAddressStrategy(remoteAddresses)); + Permission.parseResourcePerms(plainAccessResource, false, plainAccessConfig.getGroupPerms()); + Permission.parseResourcePerms(plainAccessResource, true, plainAccessConfig.getTopicPerms()); + + plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory. + getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress())); + + return plainAccessResource; } public void validate(PlainAccessResource plainAccessResource) { diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java index 77bbb1193f1..16e770206cb 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.acl.common.AclUtils; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.common.protocol.RequestCode; @@ -229,5 +230,41 @@ public void validateUpdateConsumerOffSetTest() { plainAccessValidator.validate(accessResource); } + @Test(expected = AclException.class) + public void validateNullAccessKeyTest() { + SessionCredentials sessionCredentials=new SessionCredentials(); + sessionCredentials.setAccessKey("RocketMQ1"); + sessionCredentials.setSecretKey("1234"); + AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials); + SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); + messageRequestHeader.setTopic("topicB"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + aclClientRPCHook.doBeforeRequest("", remotingCommand); + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1"); + plainAccessValidator.validate(accessResource); + } + + @Test(expected = AclException.class) + public void validateErrorSecretKeyTest() { + SessionCredentials sessionCredentials=new SessionCredentials(); + sessionCredentials.setAccessKey("RocketMQ"); + sessionCredentials.setSecretKey("1234"); + AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials); + SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader(); + messageRequestHeader.setTopic("topicB"); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader); + aclClientRPCHook.doBeforeRequest("", remotingCommand); + + ByteBuffer buf = remotingCommand.encodeHeader(); + buf.getInt(); + buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf); + buf.position(0); + PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1"); + plainAccessValidator.validate(accessResource); + } } diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java index 77d418e9bc4..ebbc4fd260c 100644 --- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java +++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java @@ -41,6 +41,7 @@ public class PlainPermissionLoaderTest { PlainAccessResource ANYPlainAccessResource; PlainAccessResource DENYPlainAccessResource; PlainAccessResource plainAccessResource = new PlainAccessResource(); + PlainAccessConfig plainAccessConfig = new PlainAccessConfig(); PlainAccessResource plainAccessResourceTwo = new PlainAccessResource(); Set adminCode = new HashSet<>(); @@ -88,24 +89,22 @@ public PlainAccessResource clonePlainAccessResource(byte perm) { } @Test - public void getPlainAccessResourceTest() { - PlainAccessResource plainAccessResource = new PlainAccessResource(); + public void buildPlainAccessResourceTest() { + PlainAccessResource plainAccessResource = null; PlainAccessConfig plainAccess = new PlainAccessConfig(); plainAccess.setAccessKey("RocketMQ"); - plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess); - Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ"); - plainAccess.setSecretKey("12345678"); - plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); + Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ"); Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678"); plainAccess.setWhiteRemoteAddress("127.0.0.1"); - plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1"); plainAccess.setAdmin(true); - plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); Assert.assertEquals(plainAccessResource.isAdmin(), true); List groups = new ArrayList(); @@ -113,7 +112,7 @@ public void getPlainAccessResourceTest() { groups.add("groupB=PUB|SUB"); groups.add("groupC=PUB"); plainAccess.setGroupPerms(groups); - plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); Map resourcePermMap = plainAccessResource.getResourcePermMap(); Assert.assertEquals(resourcePermMap.size(), 3); @@ -126,7 +125,7 @@ public void getPlainAccessResourceTest() { topics.add("topicB=PUB|SUB"); topics.add("topicC=PUB"); plainAccess.setTopicPerms(topics); - plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess); + plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess); resourcePermMap = plainAccessResource.getResourcePermMap(); Assert.assertEquals(resourcePermMap.size(), 6); @@ -158,35 +157,41 @@ public void checkPerm() { plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource); } + @Test(expected = AclException.class) + public void checkErrorPerm() { + plainAccessResource = new PlainAccessResource(); + plainAccessResource.addResourceAndPerm("topicF", Permission.SUB); + plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource); + } @Test(expected = AclException.class) public void accountNullTest() { - plainAccessResource.setAccessKey(null); - plainPermissionLoader.addPlainAccessResource(plainAccessResource); + plainAccessConfig.setAccessKey(null); + plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void accountThanTest() { - plainAccessResource.setAccessKey("123"); - plainPermissionLoader.addPlainAccessResource(plainAccessResource); + plainAccessConfig.setAccessKey("123"); + plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void passWordtNullTest() { - plainAccessResource.setAccessKey(null); - plainPermissionLoader.addPlainAccessResource(plainAccessResource); + plainAccessConfig.setAccessKey(null); + plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void passWordThanTest() { - plainAccessResource.setAccessKey("123"); - plainPermissionLoader.addPlainAccessResource(plainAccessResource); + plainAccessConfig.setAccessKey("123"); + plainPermissionLoader.buildPlainAccessResource(plainAccessConfig); } @Test(expected = AclException.class) public void testPlainAclPlugEngineInit() { System.setProperty("rocketmq.home.dir", ""); - new PlainPermissionLoader().initialize(); + new PlainPermissionLoader().load(); } @SuppressWarnings("unchecked") @@ -203,24 +208,20 @@ public void cleanAuthenticationInfoTest() throws IllegalAccessException { @Test public void isWatchStartTest() { - System.setProperty("java.version", "1.7.11"); + PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader(); Assert.assertTrue(plainPermissionLoader.isWatchStart()); - System.setProperty("java.version", "1.6.11"); - plainPermissionLoader = new PlainPermissionLoader(); - Assert.assertFalse(plainPermissionLoader.isWatchStart()); } + @Test - public void testWatch() throws IOException, IllegalAccessException { - System.setProperty("java.version", "1.7.11"); - System.setProperty("rocketmq.home.dir", "src/test/resources/conf"); - System.setProperty("rocketmq.acl.plain.file", "watch/plain_acl_watch.yml"); - String fileName = "src/test/resources/conf/watch/plain_acl_watch.yml"; + public void testWatch() throws IOException, IllegalAccessException ,InterruptedException{ + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl-test.yml"); + String fileName =System.getProperty("rocketmq.home.dir", "src/test/resources")+System.getProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml"); File transport = new File(fileName); transport.delete(); transport.createNewFile(); - FileWriter writer = new FileWriter(transport); writer.write("accounts:\r\n"); writer.write("- accessKey: watchrocketmq\r\n"); @@ -250,7 +251,7 @@ public void testWatch() throws IOException, IllegalAccessException { writer.flush(); writer.close(); - UtilAll.sleep(1000); + Thread.sleep(1000); { Map plainAccessResourceMap = (Map) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true); PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1"); @@ -260,6 +261,8 @@ public void testWatch() throws IOException, IllegalAccessException { } transport.delete(); + System.setProperty("rocketmq.home.dir", "src/test/resources"); + System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml"); } @Test(expected = AclException.class) diff --git a/acl/src/test/resources/conf/watch/plain_acl_watch.yml b/acl/src/test/resources/conf/watch/plain_acl_watch.yml index d705092a1e1..9d2c3954941 100644 --- a/acl/src/test/resources/conf/watch/plain_acl_watch.yml +++ b/acl/src/test/resources/conf/watch/plain_acl_watch.yml @@ -1,9 +1,25 @@ -accounts: -- accessKey: watchrocketmq - secretKey: 12345678 - whiteRemoteAddress: 127.0.0.1 - admin: true -- accessKey: watchrocketmq1 - secretKey: 88888888 - whiteRemoteAddress: 127.0.0.1 - admin: false +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## suggested format +accounts: +- accessKey: watchrocketmq + secretKey: 12345678 + whiteRemoteAddress: 127.0.0.1 + admin: true +- accessKey: watchrocketmq1 + secretKey: 88888888 + whiteRemoteAddress: 127.0.0.1 + admin: false