Skip to content

Commit

Permalink
[ISSUE#403] fix acl config file watch bug,clean and optimize the code…
Browse files Browse the repository at this point in the history
…s for acl feature. (apache#651)

* [ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature.

* [ISSUE#403] fix acl config file watch bug,clean and optimize the codes for acl feature.

* [ISSUE#403]fix lock/unlock issue,and optimize the codes.
  • Loading branch information
wangshaojie4039 authored and dongeforever committed Jan 4, 2019
1 parent 8afd3bf commit 180a7db
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 141 deletions.
4 changes: 4 additions & 0 deletions acl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,52 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.srvutil.FileWatchService;

public class PlainPermissionLoader {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));

private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);

private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();

private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();

private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();

private boolean isWatchStart;

public PlainPermissionLoader() {
initialize();
load();
watch();
}

public void initialize() {
public void load() {

Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();

JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);

Expand All @@ -77,92 +75,42 @@ public void initialize() {
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
addGlobalWhiteRemoteAddress(globalWhiteRemoteAddressesList.getString(i));
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}

JSONArray accounts = plainAclConfData.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccess : plainAccessList) {
this.addPlainAccessResource(getPlainAccessResource(plainAccess));
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
}
}

this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap;
}

private void watch() {
String version = System.getProperty("java.version");
String[] str = StringUtils.split(version, ".");
if (Integer.valueOf(str[1]) < 7) {
log.warn("Watch need jdk equal or greater than 1.7, current version is {}", str[1]);
return;
}

try {
int fileIndex = fileName.lastIndexOf("/") + 1;
String watchDirectory = fileName.substring(0, fileIndex);
final String watchFileName = fileName.substring(fileIndex);
log.info("watch directory is {} , watch file name is {} ", fileHome + File.separator + watchDirectory, watchFileName);

final WatchService watcher = FileSystems.getDefault().newWatchService();
Path p = Paths.get(fileHome + File.separator + watchDirectory);
p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
ServiceThread watcherServcie = new ServiceThread() {

public void run() {
while (true) {
try {
WatchKey watchKey = watcher.take();
List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
for (WatchEvent<?> event : watchEvents) {
if (watchFileName.equals(event.context().toString())
&& (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind())
|| StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
log.info("{} make a difference change is : {}", watchFileName, event.toString());
//Clearing the info, may result in a non-available time
PlainPermissionLoader.this.clearPermissionInfo();
initialize();
}
}
watchKey.reset();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
UtilAll.sleep(3000);

}
}
}

String watchFilePath = fileHome + fileName;
FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
@Override
public String getServiceName() {
return "AclWatcherService";
public void onChanged(String path) {
log.info("The plain acl yml changed, reload the context");
load();
}

};
watcherServcie.start();
});
fileWatchService.start();
log.info("Succeed to start AclWatcherService");
this.isWatchStart = true;
} catch (IOException e) {
} catch (Exception e) {
log.error("Failed to start AclWatcherService", e);
}
}

PlainAccessResource getPlainAccessResource(PlainAccessConfig plainAccess) {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccess.getAccessKey());
plainAccessResource.setSecretKey(plainAccess.getSecretKey());
plainAccessResource.setWhiteRemoteAddress(plainAccess.getWhiteRemoteAddress());

plainAccessResource.setAdmin(plainAccess.isAdmin());

plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccess.getDefaultGroupPerm()));
plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccess.getDefaultTopicPerm()));

Permission.parseResourcePerms(plainAccessResource, false, plainAccess.getGroupPerms());
Permission.parseResourcePerms(plainAccessResource, true, plainAccess.getTopicPerms());
return plainAccessResource;
}

void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
Expand Down Expand Up @@ -200,31 +148,32 @@ void clearPermissionInfo() {
this.globalWhiteRemoteAddressStrategy.clear();
}

public void addPlainAccessResource(PlainAccessResource plainAccessResource) throws AclException {
if (plainAccessResource.getAccessKey() == null
|| plainAccessResource.getSecretKey() == null
|| plainAccessResource.getAccessKey().length() <= 6
|| plainAccessResource.getSecretKey().length() <= 6) {
public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException {
if (plainAccessConfig.getAccessKey() == null
|| plainAccessConfig.getSecretKey() == null
|| plainAccessConfig.getAccessKey().length() <= 6
|| plainAccessConfig.getSecretKey().length() <= 6) {
throw new AclException(String.format(
"The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
plainAccessResource.getAccessKey(), plainAccessResource.getSecretKey()));
plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
}
try {
RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory
.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategy);
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey());
plainAccessResource.setSecretKey(plainAccessConfig.getSecretKey());
plainAccessResource.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());

if (plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
log.warn("Duplicate acl config for {}, the newly one may overwrite the old", plainAccessResource.getAccessKey());
}
plainAccessResourceMap.put(plainAccessResource.getAccessKey(), plainAccessResource);
} catch (Exception e) {
throw new AclException(String.format("Load plain access resource failed %s %s", e.getMessage(), plainAccessResource.toString()), e);
}
}
plainAccessResource.setAdmin(plainAccessConfig.isAdmin());

plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultGroupPerm()));
plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccessConfig.getDefaultTopicPerm()));

private void addGlobalWhiteRemoteAddress(String remoteAddresses) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getRemoteAddressStrategy(remoteAddresses));
Permission.parseResourcePerms(plainAccessResource, false, plainAccessConfig.getGroupPerms());
Permission.parseResourcePerms(plainAccessResource, true, plainAccessConfig.getTopicPerms());

plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress()));

return plainAccessResource;
}

public void validate(PlainAccessResource plainAccessResource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand Down Expand Up @@ -229,5 +230,41 @@ public void validateUpdateConsumerOffSetTest() {
plainAccessValidator.validate(accessResource);
}

@Test(expected = AclException.class)
public void validateNullAccessKeyTest() {
SessionCredentials sessionCredentials=new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ1");
sessionCredentials.setSecretKey("1234");
AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);

ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
plainAccessValidator.validate(accessResource);
}

@Test(expected = AclException.class)
public void validateErrorSecretKeyTest() {
SessionCredentials sessionCredentials=new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
sessionCredentials.setSecretKey("1234");
AclClientRPCHook aclClientRPCHook=new AclClientRPCHook(sessionCredentials);
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);

ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
plainAccessValidator.validate(accessResource);
}
}
Loading

0 comments on commit 180a7db

Please sign in to comment.