diff --git a/.gitignore b/.gitignore
index bd19c1f640a..8abdfd8fd6e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,4 @@ devenv
!LICENSE-BIN
.DS_Store
localbin
+nohup.out
diff --git a/acl/pom.xml b/acl/pom.xml
new file mode 100644
index 00000000000..03ce95cd07f
--- /dev/null
+++ b/acl/pom.xml
@@ -0,0 +1,53 @@
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-all
+ 4.4.0-SNAPSHOT
+
+ rocketmq-acl
+ rocketmq-acl ${project.version}
+
+ http://maven.apache.org
+
+ UTF-8
+
+
+
+ ${project.groupId}
+ rocketmq-remoting
+
+
+ ${project.groupId}
+ rocketmq-logging
+
+
+ ${project.groupId}
+ rocketmq-common
+
+
+ org.yaml
+ snakeyaml
+
+
+ commons-codec
+ commons-codec
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/AccessResource.java
new file mode 100644
index 00000000000..e30febc5719
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/AccessResource.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl;
+
+public interface AccessResource {
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
new file mode 100644
index 00000000000..0b1b0823c50
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface AccessValidator {
+ /**
+ * Parse to get the AccessResource(user, resource, needed permission)
+ *
+ * @param request
+ * @return
+ */
+ AccessResource parse(RemotingCommand request, String remoteAddr);
+
+ /**
+ * Validate the access resource.
+ *
+ * @param accessResource
+ */
+ void validate(AccessResource accessResource);
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
new file mode 100644
index 00000000000..dd8ce1e204d
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclClientRPCHook.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+import java.lang.reflect.Field;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import static org.apache.rocketmq.acl.common.SessionCredentials.ACCESS_KEY;
+import static org.apache.rocketmq.acl.common.SessionCredentials.SECURITY_TOKEN;
+import static org.apache.rocketmq.acl.common.SessionCredentials.SIGNATURE;
+
+public class AclClientRPCHook implements RPCHook {
+ private final SessionCredentials sessionCredentials;
+ protected ConcurrentHashMap, Field[]> fieldCache =
+ new ConcurrentHashMap, Field[]>();
+
+ public AclClientRPCHook(SessionCredentials sessionCredentials) {
+ this.sessionCredentials = sessionCredentials;
+ }
+
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+ byte[] total = AclUtils.combineRequestContent(request,
+ parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
+ String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
+ request.addExtField(SIGNATURE, signature);
+ request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
+
+ if (sessionCredentials.getSecurityToken() != null) {
+ request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
+ }
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
+
+ }
+
+ protected SortedMap parseRequestContent(RemotingCommand request, String ak, String securityToken) {
+ CommandCustomHeader header = request.readCustomHeader();
+ // sort property
+ SortedMap map = new TreeMap();
+ map.put(ACCESS_KEY, ak);
+ if (securityToken != null) {
+ map.put(SECURITY_TOKEN, securityToken);
+ }
+ try {
+ // add header properties
+ if (null != header) {
+ Field[] fields = fieldCache.get(header.getClass());
+ if (null == fields) {
+ fields = header.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ field.setAccessible(true);
+ }
+ Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
+ if (null != tmp) {
+ fields = tmp;
+ }
+ }
+
+ for (Field field : fields) {
+ Object value = field.get(header);
+ if (null != value && !field.isSynthetic()) {
+ map.put(field.getName(), value.toString());
+ }
+ }
+ }
+ return map;
+ } catch (Exception e) {
+ throw new RuntimeException("incompatible exception.", e);
+ }
+ }
+
+ public SessionCredentials getSessionCredentials() {
+ return sessionCredentials;
+ }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclException.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclException.java
new file mode 100644
index 00000000000..0bc97db9110
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+public class AclException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ private String status;
+ private int code;
+
+ public AclException(String status, int code) {
+ super();
+ this.status = status;
+ this.code = code;
+ }
+
+ public AclException(String status, int code, String message) {
+ super(message);
+ this.status = status;
+ this.code = code;
+ }
+
+ public AclException(String status, int code, Throwable throwable) {
+ super(throwable);
+ this.status = status;
+ this.code = code;
+ }
+
+ public AclException(String message) {
+ super(message);
+ }
+
+ public AclException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+ public AclException(String status, int code, String message, Throwable throwable) {
+ super(message, throwable);
+ this.status = status;
+ this.code = code;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclSigner.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclSigner.java
new file mode 100644
index 00000000000..61e9350663f
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclSigner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+import java.nio.charset.Charset;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class AclSigner {
+ public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
+ public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1;
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME);
+ private static final int CAL_SIGNATURE_FAILED = 10015;
+ private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s";
+
+ public static String calSignature(String data, String key) throws AclException {
+ return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
+ }
+
+ public static String calSignature(String data, String key, SigningAlgorithm algorithm,
+ Charset charset) throws AclException {
+ return signAndBase64Encode(data, key, algorithm, charset);
+ }
+
+ private static String signAndBase64Encode(String data, String key, SigningAlgorithm algorithm, Charset charset)
+ throws AclException {
+ try {
+ byte[] signature = sign(data.getBytes(charset), key.getBytes(charset), algorithm);
+ return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
+ } catch (Exception e) {
+ String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
+ log.error(message, e);
+ throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
+ }
+ }
+
+ private static byte[] sign(byte[] data, byte[] key, SigningAlgorithm algorithm) throws AclException {
+ try {
+ Mac mac = Mac.getInstance(algorithm.toString());
+ mac.init(new SecretKeySpec(key, algorithm.toString()));
+ return mac.doFinal(data);
+ } catch (Exception e) {
+ String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
+ log.error(message, e);
+ throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
+ }
+ }
+
+ public static String calSignature(byte[] data, String key) throws AclException {
+ return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
+ }
+
+ public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm,
+ Charset charset) throws AclException {
+ return signAndBase64Encode(data, key, algorithm, charset);
+ }
+
+ private static String signAndBase64Encode(byte[] data, String key, SigningAlgorithm algorithm, Charset charset)
+ throws AclException {
+ try {
+ byte[] signature = sign(data, key.getBytes(charset), algorithm);
+ return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
+ } catch (Exception e) {
+ String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
+ log.error(message, e);
+ throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
+ }
+ }
+
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
new file mode 100644
index 00000000000..1a618456f40
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.SortedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.yaml.snakeyaml.Yaml;
+
+import static org.apache.rocketmq.acl.common.SessionCredentials.CHARSET;
+
+public class AclUtils {
+
+ public static byte[] combineRequestContent(RemotingCommand request, SortedMap fieldsMap) {
+ try {
+ StringBuilder sb = new StringBuilder("");
+ for (Map.Entry entry : fieldsMap.entrySet()) {
+ if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
+ sb.append(entry.getValue());
+ }
+ }
+
+ return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
+ } catch (Exception e) {
+ throw new RuntimeException("incompatible exception.", e);
+ }
+ }
+
+ public static byte[] combineBytes(byte[] b1, byte[] b2) {
+ int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);
+ byte[] total = new byte[size];
+ if (null != b1)
+ System.arraycopy(b1, 0, total, 0, b1.length);
+ if (null != b2)
+ System.arraycopy(b2, 0, total, b1.length, b2.length);
+ return total;
+ }
+
+ public static String calSignature(byte[] data, String secretKey) {
+ String signature = AclSigner.calSignature(data, secretKey);
+ return signature;
+ }
+
+ public static void verify(String netaddress, int index) {
+ if (!AclUtils.isScope(netaddress, index)) {
+ throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
+ }
+ }
+
+ public static String[] getAddreeStrArray(String netaddress, String four) {
+ String[] fourStrArray = StringUtils.split(four.substring(1, four.length() - 1), ",");
+ String address = netaddress.substring(0, netaddress.indexOf("{"));
+ String[] addreeStrArray = new String[fourStrArray.length];
+ for (int i = 0; i < fourStrArray.length; i++) {
+ addreeStrArray[i] = address + fourStrArray[i];
+ }
+ return addreeStrArray;
+ }
+
+ public static boolean isScope(String num, int index) {
+ String[] strArray = StringUtils.split(num, ".");
+ if (strArray.length != 4) {
+ return false;
+ }
+ return isScope(strArray, index);
+
+ }
+
+ public static boolean isScope(String[] num, int index) {
+ if (num.length <= index) {
+
+ }
+ for (int i = 0; i < index; i++) {
+ if (!isScope(num[i])) {
+ return false;
+ }
+ }
+ return true;
+
+ }
+
+ public static boolean isScope(String num) {
+ return isScope(Integer.valueOf(num.trim()));
+ }
+
+ public static boolean isScope(int num) {
+ return num >= 0 && num <= 255;
+ }
+
+ public static boolean isAsterisk(String asterisk) {
+ return asterisk.indexOf('*') > -1;
+ }
+
+ public static boolean isColon(String colon) {
+ return colon.indexOf(',') > -1;
+ }
+
+ public static boolean isMinus(String minus) {
+ return minus.indexOf('-') > -1;
+
+ }
+
+ public static T getYamlDataObject(String path, Class clazz) {
+ Yaml ymal = new Yaml();
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(new File(path));
+ return ymal.loadAs(fis, clazz);
+ } catch (Exception e) {
+ throw new AclException(String.format("The file for Plain mode was not found , paths %s", path), e);
+ } finally {
+ if (fis != null) {
+ try {
+ fis.close();
+ } catch (IOException e) {
+ throw new AclException("close transport fileInputStream Exception", e);
+ }
+ }
+ }
+ }
+
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java b/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java
new file mode 100644
index 00000000000..7a95ee053d7
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.plain.PlainAccessResource;
+import org.apache.rocketmq.common.protocol.RequestCode;
+
+public class Permission {
+
+ public static final byte DENY = 1;
+ public static final byte ANY = 1 << 1;
+ public static final byte PUB = 1 << 2;
+ public static final byte SUB = 1 << 3;
+
+ public static final Set ADMIN_CODE = new HashSet();
+
+ static {
+ // UPDATE_AND_CREATE_TOPIC
+ ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC);
+ // UPDATE_BROKER_CONFIG
+ ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG);
+ // DELETE_TOPIC_IN_BROKER
+ ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER);
+ // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
+ ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP);
+ // DELETE_SUBSCRIPTIONGROUP
+ ADMIN_CODE.add(RequestCode.DELETE_SUBSCRIPTIONGROUP);
+ }
+
+ public static boolean checkPermission(byte neededPerm, byte ownedPerm) {
+ if ((ownedPerm & DENY) > 0) {
+ return false;
+ }
+ if ((neededPerm & ANY) > 0) {
+ return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0);
+ }
+ return (neededPerm & ownedPerm) > 0;
+ }
+
+ public static byte parsePermFromString(String permString) {
+ if (permString == null) {
+ return Permission.DENY;
+ }
+ switch (permString.trim()) {
+ case "PUB":
+ return Permission.PUB;
+ case "SUB":
+ return Permission.SUB;
+ case "PUB|SUB":
+ return Permission.PUB | Permission.SUB;
+ case "SUB|PUB":
+ return Permission.PUB | Permission.SUB;
+ case "DENY":
+ return Permission.DENY;
+ default:
+ return Permission.DENY;
+ }
+ }
+
+ public static void parseResourcePerms(PlainAccessResource plainAccessResource, Boolean isTopic,
+ List resources) {
+ if (resources == null || resources.isEmpty()) {
+ return;
+ }
+ for (String resource : resources) {
+ String[] items = StringUtils.split(resource, "=");
+ if (items.length == 2) {
+ plainAccessResource.addResourceAndPerm(isTopic ? items[0].trim() : PlainAccessResource.getRetryTopic(items[0].trim()), parsePermFromString(items[1].trim()));
+ } else {
+ throw new AclException(String.format("Parse resource permission failed for %s:%s", isTopic ? "topic" : "group", resource));
+ }
+ }
+ }
+
+ public static boolean needAdminPerm(Integer code) {
+ return ADMIN_CODE.contains(code);
+ }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/SessionCredentials.java b/acl/src/main/java/org/apache/rocketmq/acl/common/SessionCredentials.java
new file mode 100644
index 00000000000..33a8a34350c
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/SessionCredentials.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Properties;
+import org.apache.rocketmq.common.MixAll;
+
+public class SessionCredentials {
+ public static final Charset CHARSET = Charset.forName("UTF-8");
+ public static final String ACCESS_KEY = "AccessKey";
+ public static final String SECRET_KEY = "SecretKey";
+ public static final String SIGNATURE = "Signature";
+ public static final String SECURITY_TOKEN = "SecurityToken";
+
+ public static final String KEY_FILE = System.getProperty("rocketmq.client.keyFile",
+ System.getProperty("user.home") + File.separator + "key");
+
+ private String accessKey;
+ private String secretKey;
+ private String securityToken;
+ private String signature;
+
+ public SessionCredentials() {
+ String keyContent = null;
+ try {
+ keyContent = MixAll.file2String(KEY_FILE);
+ } catch (IOException ignore) {
+ }
+ if (keyContent != null) {
+ Properties prop = MixAll.string2Properties(keyContent);
+ if (prop != null) {
+ this.updateContent(prop);
+ }
+ }
+ }
+
+ public SessionCredentials(String accessKey, String secretKey) {
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
+ }
+
+ public SessionCredentials(String accessKey, String secretKey, String securityToken) {
+ this(accessKey, secretKey);
+ this.securityToken = securityToken;
+ }
+
+ public void updateContent(Properties prop) {
+ {
+ String value = prop.getProperty(ACCESS_KEY);
+ if (value != null) {
+ this.accessKey = value.trim();
+ }
+ }
+ {
+ String value = prop.getProperty(SECRET_KEY);
+ if (value != null) {
+ this.secretKey = value.trim();
+ }
+ }
+ {
+ String value = prop.getProperty(SECURITY_TOKEN);
+ if (value != null) {
+ this.securityToken = value.trim();
+ }
+ }
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getSignature() {
+ return signature;
+ }
+
+ public void setSignature(String signature) {
+ this.signature = signature;
+ }
+
+ public String getSecurityToken() {
+ return securityToken;
+ }
+
+ public void setSecurityToken(final String securityToken) {
+ this.securityToken = securityToken;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((accessKey == null) ? 0 : accessKey.hashCode());
+ result = prime * result + ((secretKey == null) ? 0 : secretKey.hashCode());
+ result = prime * result + ((signature == null) ? 0 : signature.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ SessionCredentials other = (SessionCredentials) obj;
+ if (accessKey == null) {
+ if (other.accessKey != null)
+ return false;
+ } else if (!accessKey.equals(other.accessKey))
+ return false;
+
+ if (secretKey == null) {
+ if (other.secretKey != null)
+ return false;
+ } else if (!secretKey.equals(other.secretKey))
+ return false;
+
+ if (signature == null) {
+ if (other.signature != null)
+ return false;
+ } else if (!signature.equals(other.signature))
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "SessionCredentials [accessKey=" + accessKey + ", secretKey=" + secretKey + ", signature="
+ + signature + ", SecurityToken=" + securityToken + "]";
+ }
+}
\ No newline at end of file
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/SigningAlgorithm.java b/acl/src/main/java/org/apache/rocketmq/acl/common/SigningAlgorithm.java
new file mode 100644
index 00000000000..6937cdf4905
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/SigningAlgorithm.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;//package com.aliyun.openservices.ons.api.impl.rocketmq.spas;
+
+public enum SigningAlgorithm {
+ HmacSHA1,
+ HmacSHA256,
+ HmacMD5;
+
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
new file mode 100644
index 00000000000..9017bf22ea4
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.rocketmq.acl.AccessResource;
+import org.apache.rocketmq.common.MixAll;
+
+public class PlainAccessResource implements AccessResource {
+
+ //identify the user
+ private String accessKey;
+
+ private String secretKey;
+
+ private String whiteRemoteAddress;
+
+ private boolean admin;
+
+ private byte defaultTopicPerm = 1;
+
+ private byte defaultGroupPerm = 1;
+
+ private Map resourcePermMap;
+
+ private RemoteAddressStrategy remoteAddressStrategy;
+
+ private int requestCode;
+
+ //the content to calculate the content
+ private byte[] content;
+
+ private String signature;
+
+ private String secretToken;
+
+ private String recognition;
+
+ public PlainAccessResource() {
+ }
+
+ public static boolean isRetryTopic(String topic) {
+ return null != topic && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+ }
+
+ public static String printStr(String resource, boolean isGroup) {
+ if (resource == null) {
+ return null;
+ }
+ if (isGroup) {
+ return String.format("%s:%s", "group", getGroupFromRetryTopic(resource));
+ } else {
+ return String.format("%s:%s", "topic", resource);
+ }
+ }
+
+ public static String getGroupFromRetryTopic(String retryTopic) {
+ if (retryTopic == null) {
+ return null;
+ }
+ return retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ }
+
+ public static String getRetryTopic(String group) {
+ if (group == null) {
+ return null;
+ }
+ return MixAll.getRetryTopic(group);
+ }
+
+ public void addResourceAndPerm(String resource, byte perm) {
+ if (resource == null) {
+ return;
+ }
+ if (resourcePermMap == null) {
+ resourcePermMap = new HashMap<>();
+ }
+ resourcePermMap.put(resource, perm);
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getWhiteRemoteAddress() {
+ return whiteRemoteAddress;
+ }
+
+ public void setWhiteRemoteAddress(String whiteRemoteAddress) {
+ this.whiteRemoteAddress = whiteRemoteAddress;
+ }
+
+ public boolean isAdmin() {
+ return admin;
+ }
+
+ public void setAdmin(boolean admin) {
+ this.admin = admin;
+ }
+
+ public byte getDefaultTopicPerm() {
+ return defaultTopicPerm;
+ }
+
+ public void setDefaultTopicPerm(byte defaultTopicPerm) {
+ this.defaultTopicPerm = defaultTopicPerm;
+ }
+
+ public byte getDefaultGroupPerm() {
+ return defaultGroupPerm;
+ }
+
+ public void setDefaultGroupPerm(byte defaultGroupPerm) {
+ this.defaultGroupPerm = defaultGroupPerm;
+ }
+
+ public Map getResourcePermMap() {
+ return resourcePermMap;
+ }
+
+ public String getRecognition() {
+ return recognition;
+ }
+
+ public void setRecognition(String recognition) {
+ this.recognition = recognition;
+ }
+
+ public int getRequestCode() {
+ return requestCode;
+ }
+
+ public void setRequestCode(int requestCode) {
+ this.requestCode = requestCode;
+ }
+
+ public String getSecretToken() {
+ return secretToken;
+ }
+
+ public void setSecretToken(String secretToken) {
+ this.secretToken = secretToken;
+ }
+
+ public RemoteAddressStrategy getRemoteAddressStrategy() {
+ return remoteAddressStrategy;
+ }
+
+ public void setRemoteAddressStrategy(RemoteAddressStrategy remoteAddressStrategy) {
+ this.remoteAddressStrategy = remoteAddressStrategy;
+ }
+
+ public String getSignature() {
+ return signature;
+ }
+
+ public void setSignature(String signature) {
+ this.signature = signature;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+
+ public byte[] getContent() {
+ return content;
+ }
+
+ public void setContent(byte[] content) {
+ this.content = content;
+ }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
new file mode 100644
index 00000000000..bb1c0a11c62
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.rocketmq.acl.AccessResource;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.acl.common.Permission;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic;
+
+public class PlainAccessValidator implements AccessValidator {
+
+ private PlainPermissionLoader aclPlugEngine;
+
+ public PlainAccessValidator() {
+ aclPlugEngine = new PlainPermissionLoader();
+ }
+
+ @Override
+ public AccessResource parse(RemotingCommand request, String remoteAddr) {
+ PlainAccessResource accessResource = new PlainAccessResource();
+ if (remoteAddr != null && remoteAddr.contains(":")) {
+ accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
+ } else {
+ accessResource.setWhiteRemoteAddress(remoteAddr);
+ }
+ accessResource.setRequestCode(request.getCode());
+ accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
+ accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
+ accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
+
+ try {
+ switch (request.getCode()) {
+ case RequestCode.SEND_MESSAGE:
+ accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
+ break;
+ case RequestCode.SEND_MESSAGE_V2:
+ accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
+ break;
+ case RequestCode.CONSUMER_SEND_MSG_BACK:
+ accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
+ accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
+ break;
+ case RequestCode.PULL_MESSAGE:
+ accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
+ accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
+ break;
+ case RequestCode.QUERY_MESSAGE:
+ accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
+ break;
+ case RequestCode.HEART_BEAT:
+ HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+ for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
+ accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
+ for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
+ accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
+ }
+ }
+ break;
+ case RequestCode.UNREGISTER_CLIENT:
+ final UnregisterClientRequestHeader unregisterClientRequestHeader =
+ (UnregisterClientRequestHeader) request
+ .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+ accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
+ break;
+ case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
+ final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
+ (GetConsumerListByGroupRequestHeader) request
+ .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+ accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
+ break;
+ case RequestCode.UPDATE_CONSUMER_OFFSET:
+ final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
+ (UpdateConsumerOffsetRequestHeader) request
+ .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+ accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
+ accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
+ break;
+ default:
+ break;
+
+ }
+ } catch (Throwable t) {
+ throw new AclException(t.getMessage(), t);
+ }
+ // content
+ SortedMap map = new TreeMap();
+ for (Map.Entry entry : request.getExtFields().entrySet()) {
+ if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ }
+ accessResource.setContent(AclUtils.combineRequestContent(request, map));
+ return accessResource;
+ }
+
+ @Override
+ public void validate(AccessResource accessResource) {
+ aclPlugEngine.validate((PlainAccessResource) accessResource);
+ }
+
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java
new file mode 100644
index 00000000000..9c36ecf71f3
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.acl.common.Permission;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class PlainPermissionLoader {
+
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
+
+ private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
+
+ private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
+ System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+ private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);
+
+ private Map plainAccessResourceMap = new HashMap<>();
+
+ private List globalWhiteRemoteAddressStrategy = new ArrayList<>();
+
+ private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
+
+ private boolean isWatchStart;
+
+ public PlainPermissionLoader() {
+ initialize();
+ watch();
+ }
+
+ public void initialize() {
+ JSONObject accessControlTransport = AclUtils.getYamlDataObject(fileHome + fileName,
+ JSONObject.class);
+
+ if (accessControlTransport == null || accessControlTransport.isEmpty()) {
+ throw new AclException(String.format("%s file is not data", fileHome + fileName));
+ }
+ log.info("BorkerAccessControlTransport data is : ", accessControlTransport.toString());
+ JSONArray globalWhiteRemoteAddressesList = accessControlTransport.getJSONArray("globalWhiteRemoteAddresses");
+ if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
+ for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
+ addGlobalWhiteRemoteAddress(globalWhiteRemoteAddressesList.getString(i));
+ }
+ }
+
+ JSONArray accounts = accessControlTransport.getJSONArray("accounts");
+ if (accounts != null && !accounts.isEmpty()) {
+ List plainAccessList = accounts.toJavaList(PlainAccessConfig.class);
+ for (PlainAccessConfig plainAccess : plainAccessList) {
+ this.addPlainAccessResource(getPlainAccessResource(plainAccess));
+ }
+ }
+ }
+
+ private void watch() {
+ String version = System.getProperty("java.version");
+ String[] str = StringUtils.split(version, ".");
+ if (Integer.valueOf(str[1]) < 7) {
+ log.warn("Watch need jdk equal or greater than 1.7, current version is {}", str[1]);
+ return;
+ }
+
+ try {
+ int fileIndex = fileName.lastIndexOf("/") + 1;
+ String watchDirectory = fileName.substring(0, fileIndex);
+ final String watchFileName = fileName.substring(fileIndex);
+ log.info("watch directory is {} , watch directory file name is {} ", fileHome + watchDirectory, watchFileName);
+
+ final WatchService watcher = FileSystems.getDefault().newWatchService();
+ Path p = Paths.get(fileHome + watchDirectory);
+ p.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
+ ServiceThread watcherServcie = new ServiceThread() {
+
+ public void run() {
+ while (true) {
+ try {
+ WatchKey watchKey = watcher.take();
+ List> watchEvents = watchKey.pollEvents();
+ for (WatchEvent> event : watchEvents) {
+ if (watchFileName.equals(event.context().toString())
+ && (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind())
+ || StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind()))) {
+ log.info("{} make a difference change is : {}", watchFileName, event.toString());
+ PlainPermissionLoader.this.clearPermissionInfo();
+ initialize();
+ }
+ }
+ watchKey.reset();
+ } catch (InterruptedException e) {
+ log.error(e.getMessage(), e);
+ UtilAll.sleep(3000);
+
+ }
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return "AclWatcherService";
+ }
+
+ };
+ watcherServcie.start();
+ log.info("Succeed to start AclWatcherService");
+ this.isWatchStart = true;
+ } catch (IOException e) {
+ log.error("Failed to start AclWatcherService", e);
+ }
+ }
+
+ PlainAccessResource getPlainAccessResource(PlainAccessConfig plainAccess) {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setAccessKey(plainAccess.getAccessKey());
+ plainAccessResource.setSecretKey(plainAccess.getSecretKey());
+ plainAccessResource.setWhiteRemoteAddress(plainAccess.getWhiteRemoteAddress());
+
+ plainAccessResource.setAdmin(plainAccess.isAdmin());
+
+ plainAccessResource.setDefaultGroupPerm(Permission.parsePermFromString(plainAccess.getDefaultGroupPerm()));
+ plainAccessResource.setDefaultTopicPerm(Permission.parsePermFromString(plainAccess.getDefaultTopicPerm()));
+
+ Permission.parseResourcePerms(plainAccessResource, false, plainAccess.getGroupPerms());
+ Permission.parseResourcePerms(plainAccessResource, true, plainAccess.getTopicPerms());
+ return plainAccessResource;
+ }
+
+ void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
+ if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
+ throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
+ }
+ Map needCheckedPermMap = needCheckedAccess.getResourcePermMap();
+ Map ownedPermMap = ownedAccess.getResourcePermMap();
+
+ if (needCheckedPermMap == null) {
+ //if the needCheckedPermMap is null,then return
+ return;
+ }
+
+ for (Map.Entry needCheckedEntry : needCheckedPermMap.entrySet()) {
+ String resource = needCheckedEntry.getKey();
+ Byte neededPerm = needCheckedEntry.getValue();
+ boolean isGroup = PlainAccessResource.isRetryTopic(resource);
+
+ if (!ownedPermMap.containsKey(resource)) {
+ //Check the default perm
+ byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
+ needCheckedAccess.getDefaultTopicPerm();
+ if (!Permission.checkPermission(neededPerm, ownedPerm)) {
+ throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
+ }
+ continue;
+ }
+ if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
+ throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
+ }
+ }
+ }
+
+ void clearPermissionInfo() {
+ this.plainAccessResourceMap.clear();
+ this.globalWhiteRemoteAddressStrategy.clear();
+ }
+
+ public void addPlainAccessResource(PlainAccessResource plainAccessResource) throws AclException {
+ if (plainAccessResource.getAccessKey() == null
+ || plainAccessResource.getSecretKey() == null
+ || plainAccessResource.getAccessKey().length() <= 6
+ || plainAccessResource.getSecretKey().length() <= 6) {
+ throw new AclException(String.format(
+ "The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
+ plainAccessResource.getAccessKey(), plainAccessResource.getSecretKey()));
+ }
+ try {
+ RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory
+ .getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategy);
+
+ if (plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
+ log.warn("Duplicate acl config for {}, the newly one may overwrite the old", plainAccessResource.getAccessKey());
+ }
+ plainAccessResourceMap.put(plainAccessResource.getAccessKey(), plainAccessResource);
+ } catch (Exception e) {
+ throw new AclException(String.format("Load plain access resource failed %s %s", e.getMessage(), plainAccessResource.toString()), e);
+ }
+ }
+
+ private void addGlobalWhiteRemoteAddress(String remoteAddresses) {
+ globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getRemoteAddressStrategy(remoteAddresses));
+ }
+
+ public void validate(PlainAccessResource plainAccessResource) {
+
+ //Step 1, check the global white remote addr
+ for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
+ if (remoteAddressStrategy.match(plainAccessResource)) {
+ return;
+ }
+ }
+
+ if (plainAccessResource.getAccessKey() == null) {
+ throw new AclException(String.format("No accessKey is configured"));
+ }
+
+ if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
+ throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
+ }
+
+ //Step 2, check the white addr for accesskey
+ PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
+ if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
+ return;
+ }
+
+ //Step 3, check the signature
+ String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
+ if (!signature.equals(plainAccessResource.getSignature())) {
+ throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
+ }
+ //Step 4, check perm of each resource
+
+ checkPerm(plainAccessResource, ownedAccess);
+ }
+
+ public boolean isWatchStart() {
+ return isWatchStart;
+ }
+
+ static class PlainAccessConfig {
+
+ private String accessKey;
+
+ private String secretKey;
+
+ private String whiteRemoteAddress;
+
+ private boolean admin;
+
+ private String defaultTopicPerm;
+
+ private String defaultGroupPerm;
+
+ private List topicPerms;
+
+ private List groupPerms;
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getWhiteRemoteAddress() {
+ return whiteRemoteAddress;
+ }
+
+ public void setWhiteRemoteAddress(String whiteRemoteAddress) {
+ this.whiteRemoteAddress = whiteRemoteAddress;
+ }
+
+ public boolean isAdmin() {
+ return admin;
+ }
+
+ public void setAdmin(boolean admin) {
+ this.admin = admin;
+ }
+
+ public String getDefaultTopicPerm() {
+ return defaultTopicPerm;
+ }
+
+ public void setDefaultTopicPerm(String defaultTopicPerm) {
+ this.defaultTopicPerm = defaultTopicPerm;
+ }
+
+ public String getDefaultGroupPerm() {
+ return defaultGroupPerm;
+ }
+
+ public void setDefaultGroupPerm(String defaultGroupPerm) {
+ this.defaultGroupPerm = defaultGroupPerm;
+ }
+
+ public List getTopicPerms() {
+ return topicPerms;
+ }
+
+ public void setTopicPerms(List topicPerms) {
+ this.topicPerms = topicPerms;
+ }
+
+ public List getGroupPerms() {
+ return groupPerms;
+ }
+
+ public void setGroupPerms(List groupPerms) {
+ this.groupPerms = groupPerms;
+ }
+
+ }
+
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategy.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategy.java
new file mode 100644
index 00000000000..60e92960e6a
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategy.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+public interface RemoteAddressStrategy {
+
+ public boolean match(PlainAccessResource plainAccessResource);
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyFactory.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyFactory.java
new file mode 100644
index 00000000000..10b4734588f
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyFactory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class RemoteAddressStrategyFactory {
+
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
+
+ public static final NullRemoteAddressStrategy NULL_NET_ADDRESS_STRATEGY = new NullRemoteAddressStrategy();
+
+ public static final BlankRemoteAddressStrategy BLANK_NET_ADDRESS_STRATEGY = new BlankRemoteAddressStrategy();
+
+ public RemoteAddressStrategy getRemoteAddressStrategy(PlainAccessResource plainAccessResource) {
+ return getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress());
+ }
+
+ public RemoteAddressStrategy getRemoteAddressStrategy(String remoteAddr) {
+ if (StringUtils.isBlank(remoteAddr)) {
+ log.warn("white list address is null");
+ return BLANK_NET_ADDRESS_STRATEGY;
+ }
+ if ("*".equals(remoteAddr)) {
+ return NULL_NET_ADDRESS_STRATEGY;
+ }
+ if (remoteAddr.endsWith("}")) {
+ String[] strArray = StringUtils.split(remoteAddr, ".");
+ String four = strArray[3];
+ if (!four.startsWith("{")) {
+ throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
+ }
+ return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, four));
+ } else if (AclUtils.isColon(remoteAddr)) {
+ return new MultipleRemoteAddressStrategy(StringUtils.split(remoteAddr, ","));
+ } else if (AclUtils.isAsterisk(remoteAddr) || AclUtils.isMinus(remoteAddr)) {
+ return new RangeRemoteAddressStrategy(remoteAddr);
+ }
+ return new OneRemoteAddressStrategy(remoteAddr);
+
+ }
+
+ public static class NullRemoteAddressStrategy implements RemoteAddressStrategy {
+ @Override
+ public boolean match(PlainAccessResource plainAccessResource) {
+ return true;
+ }
+
+ }
+
+ public static class BlankRemoteAddressStrategy implements RemoteAddressStrategy {
+ @Override
+ public boolean match(PlainAccessResource plainAccessResource) {
+ return false;
+ }
+
+ }
+
+ public static class MultipleRemoteAddressStrategy implements RemoteAddressStrategy {
+
+ private final Set multipleSet = new HashSet<>();
+
+ public MultipleRemoteAddressStrategy(String[] strArray) {
+ for (String netaddress : strArray) {
+ AclUtils.verify(netaddress, 4);
+ multipleSet.add(netaddress);
+ }
+ }
+
+ @Override
+ public boolean match(PlainAccessResource plainAccessResource) {
+ return multipleSet.contains(plainAccessResource.getWhiteRemoteAddress());
+ }
+
+ }
+
+ public static class OneRemoteAddressStrategy implements RemoteAddressStrategy {
+
+ private String netaddress;
+
+ public OneRemoteAddressStrategy(String netaddress) {
+ this.netaddress = netaddress;
+ AclUtils.verify(netaddress, 4);
+ }
+
+ @Override
+ public boolean match(PlainAccessResource plainAccessResource) {
+ return netaddress.equals(plainAccessResource.getWhiteRemoteAddress());
+ }
+
+ }
+
+ public static class RangeRemoteAddressStrategy implements RemoteAddressStrategy {
+
+ private String head;
+
+ private int start;
+
+ private int end;
+
+ private int index;
+
+ public RangeRemoteAddressStrategy(String remoteAddr) {
+ String[] strArray = StringUtils.split(remoteAddr, ".");
+ if (analysis(strArray, 2) || analysis(strArray, 3)) {
+ AclUtils.verify(remoteAddr, index - 1);
+ StringBuffer sb = new StringBuffer().append(strArray[0].trim()).append(".").append(strArray[1].trim()).append(".");
+ if (index == 3) {
+ sb.append(strArray[2].trim()).append(".");
+ }
+ this.head = sb.toString();
+ }
+ }
+
+ private boolean analysis(String[] strArray, int index) {
+ String value = strArray[index].trim();
+ this.index = index;
+ if ("*".equals(value)) {
+ setValue(0, 255);
+ } else if (AclUtils.isMinus(value)) {
+ if (value.indexOf("-") == 0) {
+ throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception value %s ", value));
+
+ }
+ String[] valueArray = StringUtils.split(value, "-");
+ this.start = Integer.valueOf(valueArray[0]);
+ this.end = Integer.valueOf(valueArray[1]);
+ if (!(AclUtils.isScope(end) && AclUtils.isScope(start) && start <= end)) {
+ throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
+ }
+ }
+ return this.end > 0 ? true : false;
+ }
+
+ private void setValue(int start, int end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public boolean match(PlainAccessResource plainAccessResource) {
+ String netAddress = plainAccessResource.getWhiteRemoteAddress();
+ if (netAddress.startsWith(this.head)) {
+ String value;
+ if (index == 3) {
+ value = netAddress.substring(this.head.length());
+ } else {
+ value = netAddress.substring(this.head.length(), netAddress.lastIndexOf('.'));
+ }
+ Integer address = Integer.valueOf(value);
+ if (address >= this.start && address <= this.end) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
new file mode 100644
index 00000000000..4169d88fe9f
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
@@ -0,0 +1,18 @@
+package org.apache.rocketmq.acl.common;
+
+import org.junit.Test;
+
+public class AclSignerTest {
+
+ @Test(expected = Exception.class)
+ public void calSignatureExceptionTest(){
+ AclSigner.calSignature(new byte[]{},"");
+ }
+
+ @Test
+ public void calSignatureTest(){
+ AclSigner.calSignature("RocketMQ","12345678");
+ AclSigner.calSignature("RocketMQ".getBytes(),"12345678");
+ }
+
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
new file mode 100644
index 00000000000..72bcda6bb3a
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AclUtilsTest {
+
+ @Test
+ public void getAddreeStrArray() {
+ String address = "1.1.1.{1,2,3,4}";
+ String[] addressArray = AclUtils.getAddreeStrArray(address, "{1,2,3,4}");
+ List newAddressList = new ArrayList<>();
+ for (String a : addressArray) {
+ newAddressList.add(a);
+ }
+
+ List addressList = new ArrayList<>();
+ addressList.add("1.1.1.1");
+ addressList.add("1.1.1.2");
+ addressList.add("1.1.1.3");
+ addressList.add("1.1.1.4");
+ Assert.assertEquals(newAddressList, addressList);
+ }
+
+ @Test
+ public void isScopeStringArray() {
+ String adderss = "12";
+
+ for (int i = 0; i < 6; i++) {
+ boolean isScope = AclUtils.isScope(adderss, 4);
+ if (i == 3) {
+ Assert.assertTrue(isScope);
+ } else {
+ Assert.assertFalse(isScope);
+ }
+ adderss = adderss + ".12";
+ }
+ }
+
+ @Test
+ public void isScopeArray() {
+ String[] adderss = StringUtils.split("12.12.12.12", ".");
+ boolean isScope = AclUtils.isScope(adderss, 4);
+ Assert.assertTrue(isScope);
+ isScope = AclUtils.isScope(adderss, 3);
+ Assert.assertTrue(isScope);
+
+ adderss = StringUtils.split("12.12.1222.1222", ".");
+ isScope = AclUtils.isScope(adderss, 4);
+ Assert.assertFalse(isScope);
+ isScope = AclUtils.isScope(adderss, 3);
+ Assert.assertFalse(isScope);
+
+ }
+
+ @Test
+ public void isScopeStringTest() {
+ for (int i = 0; i < 256; i++) {
+ boolean isScope = AclUtils.isScope(i + "");
+ Assert.assertTrue(isScope);
+ }
+ boolean isScope = AclUtils.isScope("-1");
+ Assert.assertFalse(isScope);
+ isScope = AclUtils.isScope("256");
+ Assert.assertFalse(isScope);
+ }
+
+ @Test
+ public void isScopeTest() {
+ for (int i = 0; i < 256; i++) {
+ boolean isScope = AclUtils.isScope(i);
+ Assert.assertTrue(isScope);
+ }
+ boolean isScope = AclUtils.isScope(-1);
+ Assert.assertFalse(isScope);
+ isScope = AclUtils.isScope(256);
+ Assert.assertFalse(isScope);
+
+ }
+
+ @Test
+ public void isAsteriskTest() {
+ boolean isAsterisk = AclUtils.isAsterisk("*");
+ Assert.assertTrue(isAsterisk);
+
+ isAsterisk = AclUtils.isAsterisk(",");
+ Assert.assertFalse(isAsterisk);
+ }
+
+ @Test
+ public void isColonTest() {
+ boolean isColon = AclUtils.isColon(",");
+ Assert.assertTrue(isColon);
+
+ isColon = AclUtils.isColon("-");
+ Assert.assertFalse(isColon);
+ }
+
+ @Test
+ public void isMinusTest() {
+ boolean isMinus = AclUtils.isMinus("-");
+ Assert.assertTrue(isMinus);
+
+ isMinus = AclUtils.isMinus("*");
+ Assert.assertFalse(isMinus);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void getYamlDataObjectTest() {
+
+ Map map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl.yml", Map.class);
+ Assert.assertFalse(map.isEmpty());
+ }
+
+ @Test(expected = Exception.class)
+ public void getYamlDataObjectExceptionTest() {
+
+ AclUtils.getYamlDataObject("plain_acl.yml", Map.class);
+ }
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
new file mode 100644
index 00000000000..31820ad7d59
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.acl.plain.PlainAccessResource;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PermissionTest {
+
+ @Test
+ public void fromStringGetPermissionTest() {
+ byte perm = Permission.parsePermFromString("PUB");
+ Assert.assertEquals(perm, Permission.PUB);
+
+ perm = Permission.parsePermFromString("SUB");
+ Assert.assertEquals(perm, Permission.SUB);
+
+ perm = Permission.parsePermFromString("PUB|SUB");
+ Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
+
+ perm = Permission.parsePermFromString("SUB|PUB");
+ Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
+
+ perm = Permission.parsePermFromString("DENY");
+ Assert.assertEquals(perm, Permission.DENY);
+
+ perm = Permission.parsePermFromString("1");
+ Assert.assertEquals(perm, Permission.DENY);
+
+ perm = Permission.parsePermFromString(null);
+ Assert.assertEquals(perm, Permission.DENY);
+
+ }
+
+ @Test
+ public void checkPermissionTest() {
+ boolean boo = Permission.checkPermission(Permission.DENY, Permission.DENY);
+ Assert.assertFalse(boo);
+
+ boo = Permission.checkPermission(Permission.PUB, Permission.PUB);
+ Assert.assertTrue(boo);
+
+ boo = Permission.checkPermission(Permission.SUB, Permission.SUB);
+ Assert.assertTrue(boo);
+
+ boo = Permission.checkPermission(Permission.PUB, (byte) (Permission.PUB|Permission.SUB));
+ Assert.assertTrue(boo);
+
+ boo = Permission.checkPermission(Permission.SUB, (byte) (Permission.PUB|Permission.SUB));
+ Assert.assertTrue(boo);
+
+ boo = Permission.checkPermission(Permission.ANY, (byte) (Permission.PUB|Permission.SUB));
+ Assert.assertTrue(boo);
+
+ boo = Permission.checkPermission(Permission.ANY, Permission.SUB);
+ Assert.assertTrue(boo);
+
+ boo = Permission.checkPermission(Permission.ANY, Permission.PUB);
+ Assert.assertTrue(boo);
+
+ boo = Permission.checkPermission(Permission.DENY, Permission.ANY);
+ Assert.assertFalse(boo);
+
+ boo = Permission.checkPermission(Permission.DENY, Permission.PUB);
+ Assert.assertFalse(boo);
+
+ boo = Permission.checkPermission(Permission.DENY, Permission.SUB);
+ Assert.assertFalse(boo);
+
+ }
+
+ @Test(expected = AclException.class)
+ public void setTopicPermTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ Map resourcePermMap = plainAccessResource.getResourcePermMap();
+
+ Permission.parseResourcePerms(plainAccessResource, false, null);
+ Assert.assertNull(resourcePermMap);
+
+ List groups = new ArrayList<>();
+ Permission.parseResourcePerms(plainAccessResource, false, groups);
+ Assert.assertNull(resourcePermMap);
+
+ groups.add("groupA=DENY");
+ groups.add("groupB=PUB|SUB");
+ groups.add("groupC=PUB");
+ Permission.parseResourcePerms(plainAccessResource, false, groups);
+ resourcePermMap = plainAccessResource.getResourcePermMap();
+
+ byte perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA"));
+ Assert.assertEquals(perm, Permission.DENY);
+
+ perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB"));
+ Assert.assertEquals(perm,Permission.PUB|Permission.SUB);
+
+ perm = resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC"));
+ Assert.assertEquals(perm, Permission.PUB);
+
+ List topics = new ArrayList<>();
+ topics.add("topicA=DENY");
+ topics.add("topicB=PUB|SUB");
+ topics.add("topicC=PUB");
+
+ Permission.parseResourcePerms(plainAccessResource, true, topics);
+
+ perm = resourcePermMap.get("topicA");
+ Assert.assertEquals(perm, Permission.DENY);
+
+ perm = resourcePermMap.get("topicB");
+ Assert.assertEquals(perm, Permission.PUB|Permission.SUB);
+
+ perm = resourcePermMap.get("topicC");
+ Assert.assertEquals(perm, Permission.PUB);
+
+ List erron = new ArrayList<>();
+ erron.add("");
+ Permission.parseResourcePerms(plainAccessResource, false, erron);
+ }
+
+ @Test
+ public void checkAdminCodeTest() {
+ Set code = new HashSet<>();
+ code.add(17);
+ code.add(25);
+ code.add(215);
+ code.add(200);
+ code.add(207);
+
+ for (int i = 0; i < 400; i++) {
+ boolean boo = Permission.needAdminPerm(i);
+ if (boo) {
+ Assert.assertTrue(code.contains(i));
+ }
+ }
+ }
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
new file mode 100644
index 00000000000..b6f9b8ce05a
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
@@ -0,0 +1,29 @@
+package org.apache.rocketmq.acl.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class SessionCredentialsTest {
+
+ @Test
+ public void equalsTest(){
+ SessionCredentials sessionCredentials=new SessionCredentials("RocketMQ","12345678");
+ sessionCredentials.setSecurityToken("abcd");
+ SessionCredentials other=new SessionCredentials("RocketMQ","12345678","abcd");
+ Assert.assertTrue(sessionCredentials.equals(other));
+ }
+
+ @Test
+ public void updateContentTest(){
+ SessionCredentials sessionCredentials=new SessionCredentials();
+ Properties properties=new Properties();
+ properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+ properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+ properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+ sessionCredentials.updateContent(properties);
+ }
+
+
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
new file mode 100644
index 00000000000..77bbb1193f1
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PlainAccessValidatorTest {
+
+ private PlainAccessValidator plainAccessValidator;
+ private AclClientRPCHook aclClient;
+ private SessionCredentials sessionCredentials;
+ @Before
+ public void init() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ plainAccessValidator = new PlainAccessValidator();
+ sessionCredentials = new SessionCredentials();
+ sessionCredentials.setAccessKey("RocketMQ");
+ sessionCredentials.setSecretKey("12345678");
+ sessionCredentials.setSecurityToken("87654321");
+ aclClient = new AclClientRPCHook(sessionCredentials);
+ }
+
+ @Test
+ public void contentTest() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic("topicA");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "127.0.0.1");
+ String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey());
+
+ Assert.assertEquals(accessResource.getSignature(), signature);
+
+ }
+
+ @Test
+ public void validateTest() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic("topicB");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
+ plainAccessValidator.validate(accessResource);
+
+ }
+
+ @Test
+ public void validateSendMessageTest() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic("topicB");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validateSendMessageV2Test() {
+ SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
+ messageRequestHeader.setTopic("topicC");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
+ aclClient.doBeforeRequest("", remotingCommand);
+
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validatePullMessageTest() {
+ PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader();
+ pullMessageRequestHeader.setTopic("topicC");
+ pullMessageRequestHeader.setConsumerGroup("consumerGroupA");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,pullMessageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validateConsumeMessageBackTest() {
+ ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader=new ConsumerSendMsgBackRequestHeader();
+ consumerSendMsgBackRequestHeader.setOriginTopic("topicC");
+ consumerSendMsgBackRequestHeader.setGroup("consumerGroupA");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,consumerSendMsgBackRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validateQueryMessageTest() {
+ QueryMessageRequestHeader queryMessageRequestHeader=new QueryMessageRequestHeader();
+ queryMessageRequestHeader.setTopic("topicC");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE,queryMessageRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validateHeartBeatTest() {
+ HeartbeatData heartbeatData=new HeartbeatData();
+ Set producerDataSet=new HashSet<>();
+ Set consumerDataSet=new HashSet<>();
+ Set subscriptionDataSet=new HashSet<>();
+ ProducerData producerData=new ProducerData();
+ producerData.setGroupName("producerGroupA");
+ ConsumerData consumerData=new ConsumerData();
+ consumerData.setGroupName("consumerGroupA");
+ SubscriptionData subscriptionData=new SubscriptionData();
+ subscriptionData.setTopic("topicC");
+ producerDataSet.add(producerData);
+ consumerDataSet.add(consumerData);
+ subscriptionDataSet.add(subscriptionData);
+ consumerData.setSubscriptionDataSet(subscriptionDataSet);
+ heartbeatData.setProducerDataSet(producerDataSet);
+ heartbeatData.setConsumerDataSet(consumerDataSet);
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT,null);
+ remotingCommand.setBody(heartbeatData.encode());
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encode();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validateUnRegisterClientTest() {
+ UnregisterClientRequestHeader unregisterClientRequestHeader=new UnregisterClientRequestHeader();
+ unregisterClientRequestHeader.setConsumerGroup("consumerGroupA");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT,unregisterClientRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validateGetConsumerListByGroupTest() {
+ GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader=new GetConsumerListByGroupRequestHeader();
+ getConsumerListByGroupRequestHeader.setConsumerGroup("consumerGroupA");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP,getConsumerListByGroupRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+ @Test
+ public void validateUpdateConsumerOffSetTest() {
+ UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader=new UpdateConsumerOffsetRequestHeader();
+ updateConsumerOffsetRequestHeader.setConsumerGroup("consumerGroupA");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET,updateConsumerOffsetRequestHeader);
+ aclClient.doBeforeRequest("", remotingCommand);
+ ByteBuffer buf = remotingCommand.encodeHeader();
+ buf.getInt();
+ buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
+ buf.position(0);
+ PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.0.1:9876");
+ plainAccessValidator.validate(accessResource);
+ }
+
+
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java
new file mode 100644
index 00000000000..68f6e11986e
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.Permission;
+import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PlainPermissionLoaderTest {
+
+ PlainPermissionLoader plainPermissionLoader;
+ PlainAccessResource PUBPlainAccessResource;
+ PlainAccessResource SUBPlainAccessResource;
+ PlainAccessResource ANYPlainAccessResource;
+ PlainAccessResource DENYPlainAccessResource;
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
+ Set adminCode = new HashSet<>();
+
+ @Before
+ public void init() throws NoSuchFieldException, SecurityException, IOException {
+ // UPDATE_AND_CREATE_TOPIC
+ adminCode.add(17);
+ // UPDATE_BROKER_CONFIG
+ adminCode.add(25);
+ // DELETE_TOPIC_IN_BROKER
+ adminCode.add(215);
+ // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
+ adminCode.add(200);
+ // DELETE_SUBSCRIPTIONGROUP
+ adminCode.add(207);
+
+ PUBPlainAccessResource = clonePlainAccessResource(Permission.PUB);
+ SUBPlainAccessResource = clonePlainAccessResource(Permission.SUB);
+ ANYPlainAccessResource = clonePlainAccessResource(Permission.ANY);
+ DENYPlainAccessResource = clonePlainAccessResource(Permission.DENY);
+
+ System.setProperty("java.version", "1.6.11");
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("romcketmq.acl.plain.fileName", "/conf/plain_acl.yml");
+ plainPermissionLoader = new PlainPermissionLoader();
+
+ }
+
+ public PlainAccessResource clonePlainAccessResource(byte perm) {
+ PlainAccessResource painAccessResource = new PlainAccessResource();
+ painAccessResource.setAccessKey("RocketMQ");
+ painAccessResource.setSecretKey("12345678");
+ painAccessResource.setWhiteRemoteAddress("127.0." + perm + ".*");
+ painAccessResource.setDefaultGroupPerm(perm);
+ painAccessResource.setDefaultTopicPerm(perm);
+ painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupA"), Permission.PUB);
+ painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupB"), Permission.SUB);
+ painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupC"), Permission.ANY);
+ painAccessResource.addResourceAndPerm(PlainAccessResource.getRetryTopic("groupD"), Permission.DENY);
+
+ painAccessResource.addResourceAndPerm("topicA", Permission.PUB);
+ painAccessResource.addResourceAndPerm("topicB", Permission.SUB);
+ painAccessResource.addResourceAndPerm("topicC", Permission.ANY);
+ painAccessResource.addResourceAndPerm("topicD", Permission.DENY);
+ return painAccessResource;
+ }
+
+ @Test
+ public void getPlainAccessResourceTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ PlainAccessConfig plainAccess = new PlainAccessConfig();
+
+ plainAccess.setAccessKey("RocketMQ");
+ plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
+ Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
+
+ plainAccess.setSecretKey("12345678");
+ plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
+ Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
+
+ plainAccess.setWhiteRemoteAddress("127.0.0.1");
+ plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
+ Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
+
+ plainAccess.setAdmin(true);
+ plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
+ Assert.assertEquals(plainAccessResource.isAdmin(), true);
+
+ List groups = new ArrayList();
+ groups.add("groupA=DENY");
+ groups.add("groupB=PUB|SUB");
+ groups.add("groupC=PUB");
+ plainAccess.setGroupPerms(groups);
+ plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
+ Map resourcePermMap = plainAccessResource.getResourcePermMap();
+ Assert.assertEquals(resourcePermMap.size(), 3);
+
+ Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), Permission.DENY);
+ Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB|Permission.SUB);
+ Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), Permission.PUB);
+
+ List topics = new ArrayList();
+ topics.add("topicA=DENY");
+ topics.add("topicB=PUB|SUB");
+ topics.add("topicC=PUB");
+ plainAccess.setTopicPerms(topics);
+ plainAccessResource = plainPermissionLoader.getPlainAccessResource(plainAccess);
+ resourcePermMap = plainAccessResource.getResourcePermMap();
+ Assert.assertEquals(resourcePermMap.size(), 6);
+
+ Assert.assertEquals(resourcePermMap.get("topicA").byteValue(), Permission.DENY);
+ Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.PUB|Permission.SUB);
+ Assert.assertEquals(resourcePermMap.get("topicC").byteValue(), Permission.PUB);
+ }
+
+ @Test(expected = AclException.class)
+ public void checkPermAdmin() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setRequestCode(17);
+ plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
+ }
+
+ @Test
+ public void checkPerm() {
+
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
+ plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
+ plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
+ plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
+
+ plainAccessResource = new PlainAccessResource();
+ plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
+ plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
+ plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
+ plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
+
+ }
+
+ @Test(expected = AclException.class)
+ public void accountNullTest() {
+ plainAccessResource.setAccessKey(null);
+ plainPermissionLoader.addPlainAccessResource(plainAccessResource);
+ }
+
+ @Test(expected = AclException.class)
+ public void accountThanTest() {
+ plainAccessResource.setAccessKey("123");
+ plainPermissionLoader.addPlainAccessResource(plainAccessResource);
+ }
+
+ @Test(expected = AclException.class)
+ public void passWordtNullTest() {
+ plainAccessResource.setAccessKey(null);
+ plainPermissionLoader.addPlainAccessResource(plainAccessResource);
+ }
+
+ @Test(expected = AclException.class)
+ public void passWordThanTest() {
+ plainAccessResource.setAccessKey("123");
+ plainPermissionLoader.addPlainAccessResource(plainAccessResource);
+ }
+
+ @Test(expected = AclException.class)
+ public void testPlainAclPlugEngineInit() {
+ System.setProperty("rocketmq.home.dir", "");
+ new PlainPermissionLoader().initialize();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void cleanAuthenticationInfoTest() throws IllegalAccessException {
+ //plainPermissionLoader.addPlainAccessResource(plainAccessResource);
+ Map> plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ Assert.assertFalse(plainAccessResourceMap.isEmpty());
+
+ plainPermissionLoader.clearPermissionInfo();
+ plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ Assert.assertTrue(plainAccessResourceMap.isEmpty());
+ }
+
+ @Test
+ public void isWatchStartTest() {
+ System.setProperty("java.version", "1.7.11");
+ PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
+ Assert.assertTrue(plainPermissionLoader.isWatchStart());
+ System.setProperty("java.version", "1.6.11");
+ plainPermissionLoader = new PlainPermissionLoader();
+ Assert.assertFalse(plainPermissionLoader.isWatchStart());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void watchTest() throws IOException, IllegalAccessException {
+ System.setProperty("java.version", "1.7.11");
+ System.setProperty("rocketmq.home.dir", "src/test/resources/watch");
+ File file = new File("src/test/resources/watch/conf");
+ file.mkdirs();
+ File transport = new File("src/test/resources/watch/conf/plain_acl.yml");
+ transport.delete();
+ transport.createNewFile();
+
+ FileWriter writer = new FileWriter(transport);
+ writer.write("accounts:\r\n");
+ writer.write("- accessKey: rokcetmq\r\n");
+ writer.write(" secretKey: aliyun11\r\n");
+ writer.write(" whiteRemoteAddress: 127.0.0.1\r\n");
+ writer.write(" admin: true\r\n");
+ writer.flush();
+ writer.close();
+ PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
+
+ Map> plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ Assert.assertNotNull(plainAccessResourceMap.get("rokcetmq"));
+
+ writer = new FileWriter(new File("src/test/resources/watch/conf/plain_acl.yml"), true);
+ writer.write("- accessKey: rokcet1\r\n");
+ writer.write(" secretKey: aliyun1\r\n");
+ writer.write(" whiteRemoteAddress: 127.0.0.1\r\n");
+ writer.write(" admin: true\r\n");
+ writer.flush();
+ writer.close();
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ plainAccessResourceMap = (Map>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ Assert.assertNotNull(plainAccessResourceMap.get("rokcet1"));
+
+ }
+
+ @Test(expected = AclException.class)
+ public void initializeTest() {
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
+ new PlainPermissionLoader();
+
+ }
+
+}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyTest.java
new file mode 100644
index 00000000000..53391f41186
--- /dev/null
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plain;
+
+import org.apache.rocketmq.acl.common.AclException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RemoteAddressStrategyTest {
+
+ RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
+
+ @Test
+ public void netaddressStrategyFactoryExceptionTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource).getClass(),
+ RemoteAddressStrategyFactory.BlankRemoteAddressStrategy.class);
+ }
+
+ @Test
+ public void netaddressStrategyFactoryTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+
+ plainAccessResource.setWhiteRemoteAddress("*");
+ RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy, RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.OneRemoteAddressStrategy.class);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.MultipleRemoteAddressStrategy.class);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.1-20.*");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.RangeRemoteAddressStrategy.class);
+
+ plainAccessResource.setWhiteRemoteAddress("");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ Assert.assertEquals(remoteAddressStrategy.getClass(), RemoteAddressStrategyFactory.BlankRemoteAddressStrategy.class);
+ }
+
+ @Test(expected = AclException.class)
+ public void verifyTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setWhiteRemoteAddress("256.0.0.1");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ }
+
+ @Test
+ public void nullNetaddressStrategyTest() {
+ boolean isMatch = RemoteAddressStrategyFactory.NULL_NET_ADDRESS_STRATEGY.match(new PlainAccessResource());
+ Assert.assertTrue(isMatch);
+ }
+
+ @Test
+ public void blankNetaddressStrategyTest() {
+ boolean isMatch = RemoteAddressStrategyFactory.BLANK_NET_ADDRESS_STRATEGY.match(new PlainAccessResource());
+ Assert.assertFalse(isMatch);
+ }
+
+ public void oneNetaddressStrategyTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
+ RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setWhiteRemoteAddress("");
+ boolean match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertFalse(match);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
+ match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertFalse(match);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
+ match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertTrue(match);
+ }
+
+ @Test
+ public void multipleNetaddressStrategyTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1,127.0.0.2,127.0.0.3");
+ RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ multipleNetaddressStrategyTest(remoteAddressStrategy);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.{1,2,3}");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ multipleNetaddressStrategyTest(remoteAddressStrategy);
+
+ }
+
+ @Test(expected = AclException.class)
+ public void multipleNetaddressStrategyExceptionTest() {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1,2,3}");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ }
+
+ private void multipleNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy) {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1");
+ boolean match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertTrue(match);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.2");
+ match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertTrue(match);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.3");
+ match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertTrue(match);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.4");
+ match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertFalse(match);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.0");
+ match = remoteAddressStrategy.match(plainAccessResource);
+ Assert.assertFalse(match);
+
+ }
+
+ @Test
+ public void rangeNetaddressStrategyTest() {
+ String head = "127.0.0.";
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.1-200");
+ RemoteAddressStrategy remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ rangeNetaddressStrategyTest(remoteAddressStrategy, head, 1, 200, true);
+ plainAccessResource.setWhiteRemoteAddress("127.0.0.*");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ rangeNetaddressStrategyTest(remoteAddressStrategy, head, 0, 255, true);
+
+ plainAccessResource.setWhiteRemoteAddress("127.0.1-200.*");
+ remoteAddressStrategy = remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ rangeNetaddressStrategyThirdlyTest(remoteAddressStrategy, head, 1, 200);
+ }
+
+ private void rangeNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
+ int end,
+ boolean isFalse) {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ for (int i = -10; i < 300; i++) {
+ plainAccessResource.setWhiteRemoteAddress(head + i);
+ boolean match = remoteAddressStrategy.match(plainAccessResource);
+ if (isFalse && i >= start && i <= end) {
+ Assert.assertTrue(match);
+ continue;
+ }
+ Assert.assertFalse(match);
+
+ }
+ }
+
+ private void rangeNetaddressStrategyThirdlyTest(RemoteAddressStrategy remoteAddressStrategy, String head, int start,
+ int end) {
+ String newHead;
+ for (int i = -10; i < 300; i++) {
+ newHead = head + i;
+ if (i >= start && i <= end) {
+ rangeNetaddressStrategyTest(remoteAddressStrategy, newHead, 0, 255, false);
+ }
+ }
+ }
+
+ @Test(expected = AclException.class)
+ public void rangeNetaddressStrategyExceptionStartGreaterEndTest() {
+ rangeNetaddressStrategyExceptionTest("127.0.0.2-1");
+ }
+
+ @Test(expected = AclException.class)
+ public void rangeNetaddressStrategyExceptionScopeTest() {
+ rangeNetaddressStrategyExceptionTest("127.0.0.-1-200");
+ }
+
+ @Test(expected = AclException.class)
+ public void rangeNetaddressStrategyExceptionScopeTwoTest() {
+ rangeNetaddressStrategyExceptionTest("127.0.0.0-256");
+ }
+
+ private void rangeNetaddressStrategyExceptionTest(String netaddress) {
+ PlainAccessResource plainAccessResource = new PlainAccessResource();
+ plainAccessResource.setWhiteRemoteAddress(netaddress);
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ }
+
+}
diff --git a/acl/src/test/resources/conf/plain_acl.yml b/acl/src/test/resources/conf/plain_acl.yml
new file mode 100644
index 00000000000..5daefb67c33
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## suggested format
+
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+
+accounts:
+- accessKey: RocketMQ
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.0.*
+ admin: false
+ defaultTopicPerm: DENY
+ defaultGroupPerm: SUB
+ topicPerms:
+ - topicA=DENY
+ - topicB=PUB|SUB
+ - topicC=SUB
+ groupPerms:
+ # the group should convert to retry topic
+ - groupA=DENY
+ - groupB=SUB
+ - groupC=SUB
+
+- accessKey: aliyun.com
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.1.*
+ # if it is admin, it could access all resources
+ admin: true
+
diff --git a/acl/src/test/resources/conf/plain_acl_null.yml b/acl/src/test/resources/conf/plain_acl_null.yml
new file mode 100644
index 00000000000..bc30380c888
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_null.yml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## suggested format
+
+
diff --git a/broker/pom.xml b/broker/pom.xml
index f10ae53730e..f617d2492d1 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -1,21 +1,17 @@
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-
+
org.apache.rocketmq
rocketmq-all
@@ -52,6 +48,10 @@
${project.groupId}
rocketmq-filter
+
+ ${project.groupId}
+ rocketmq-acl
+
ch.qos.logback
logback-classic
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e7ef46d0c61..73ed7eb4caa 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -31,6 +31,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -91,6 +92,7 @@
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
@@ -157,6 +159,7 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
+
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
@@ -467,6 +470,8 @@ private void reloadServerSslContext() {
}
}
initialTransaction();
+ initialAcl();
+ initialRpcHooks();
}
return result;
}
@@ -486,6 +491,47 @@ private void initialTransaction() {
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
+ private void initialAcl() {
+ if (!this.brokerConfig.isEnableAcl()) {
+ log.info("The broker dose not enable acl");
+ return;
+ }
+
+ List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
+ if (accessValidators == null || accessValidators.isEmpty()) {
+ log.info("The broker dose not load the AccessValidator");
+ return;
+ }
+
+ for (AccessValidator accessValidator: accessValidators) {
+ final AccessValidator validator = accessValidator;
+ this.registerServerRPCHook(new RPCHook() {
+
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+ //Do not catch the exception
+ validator.validate(validator.parse(request, remoteAddr));
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
+ }
+ });
+ }
+ }
+
+
+ private void initialRpcHooks() {
+
+ List rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
+ if (rpcHooks == null || rpcHooks.isEmpty()) {
+ return;
+ }
+ for (RPCHook rpcHook: rpcHooks) {
+ this.registerServerRPCHook(rpcHook);
+ }
+ }
+
public void registerProcessor() {
/**
* SendMessageProcessor
@@ -989,6 +1035,7 @@ public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
public void registerServerRPCHook(RPCHook rpcHook) {
getRemotingServer().registerRPCHook(rpcHook);
+ this.fastRemotingServer.registerRPCHook(rpcHook);
}
public RemotingServer getRemotingServer() {
@@ -1049,7 +1096,9 @@ public void setTransactionalMessageCheckListener(
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
+
public BlockingQueue getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
+
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
index 8b9b63e4dcb..e679660104d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
@@ -34,6 +34,14 @@ public class ServiceProvider {
public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
+
+ public static final String RPC_HOOK_ID = "META-INF/service/org.apache.rocketmq.remoting.RPCHook";
+
+
+ public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator";
+
+
+
static {
thisClassLoader = getClassLoader(ServiceProvider.class);
}
diff --git a/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
new file mode 100644
index 00000000000..1abc92e0162
--- /dev/null
+++ b/broker/src/main/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
@@ -0,0 +1 @@
+org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 56abf084a7c..dae1335540b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -24,6 +24,7 @@
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
+import org.junit.Ignore;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
index 22228a6e0ef..a3a35c8832d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
@@ -17,12 +17,15 @@
package org.apache.rocketmq.broker.util;
+import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.List;
+
public class ServiceProviderTest {
@Test
@@ -38,4 +41,10 @@ public void loadAbstractTransactionListenerTest() {
AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull();
}
+
+ @Test
+ public void loadAccessValidatorTest() {
+ List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
+ assertThat(accessValidators).isNotNull();
+ }
}
diff --git a/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
new file mode 100644
index 00000000000..1abc92e0162
--- /dev/null
+++ b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
@@ -0,0 +1 @@
+org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 11c1fcb9177..9823ca04799 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -174,6 +174,23 @@ public class BrokerConfig {
@ImportantField
private long transactionCheckInterval = 60 * 1000;
+ /**
+ * Acl feature switch
+ */
+ @ImportantField
+ private boolean enableAcl = false;
+
+
+ public static String localHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ log.error("Failed to obtain the host name", e);
+ }
+
+ return "DEFAULT_BROKER";
+ }
+
public boolean isTraceOn() {
return traceOn;
}
@@ -238,16 +255,6 @@ public void setSlaveReadEnable(final boolean slaveReadEnable) {
this.slaveReadEnable = slaveReadEnable;
}
- public static String localHostName() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- log.error("Failed to obtain the host name", e);
- }
-
- return "DEFAULT_BROKER";
- }
-
public int getRegisterBrokerTimeoutMills() {
return registerBrokerTimeoutMills;
}
@@ -712,6 +719,14 @@ public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval;
}
+ public boolean isEnableAcl() {
+ return enableAcl;
+ }
+
+ public void setEnableAcl(boolean isAclPlug) {
+ this.enableAcl = isAclPlug;
+ }
+
public int getEndTransactionThreadPoolNums() {
return endTransactionThreadPoolNums;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index a846755d8db..dee6ca29114 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -60,6 +60,18 @@ public static int getPid() {
}
}
+ public static void sleep(long sleepMs) {
+ if (sleepMs < 0) {
+ return;
+ }
+ try {
+ Thread.sleep(sleepMs);
+ } catch (Throwable ignored) {
+
+ }
+
+ }
+
public static String currentStackTrace() {
StringBuilder sb = new StringBuilder();
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index fe0ae9f1713..a463a4ed7c1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -37,4 +37,5 @@ public class LoggerName {
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
+ public static final String ACL_PLUG_LOGGER_NAME = "RocketmqAclPlug";
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
index 0d2dec6fa5c..8d86544be69 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
@@ -17,14 +17,13 @@
package org.apache.rocketmq.common;
-import org.junit.Test;
-
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/distribution/conf/broker.conf b/distribution/conf/broker.conf
index 0c0b28b7b8e..970395735d7 100644
--- a/distribution/conf/broker.conf
+++ b/distribution/conf/broker.conf
@@ -20,3 +20,5 @@ deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
+enableAcl=true
+namesrvAddr=127.0.0.1:9876
diff --git a/distribution/conf/plain_acl.yml b/distribution/conf/plain_acl.yml
new file mode 100644
index 00000000000..9043b0dd800
--- /dev/null
+++ b/distribution/conf/plain_acl.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+globalWhiteRemoteAddresses:
+
+accounts:
+- accessKey: RocketMQ
+ secretKey: 12345678
+ whiteRemoteAddress:
+ admin: false
+ defaultTopicPerm: DENY
+ defaultGroupPerm: SUB
+ topicPerms:
+ - topicA=DENY
+ - topicB=PUB|SUB
+ - topicC=SUB
+ groupPerms:
+ # the group should convert to retry topic
+ - groupA=DENY
+ - groupB=PUB|SUB
+ - groupC=SUB
+
+- accessKey: aliyun.com
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.1.*
+ # if it is admin, it could access all resources
+ admin: true
+
diff --git a/distribution/conf/tools.yml b/distribution/conf/tools.yml
new file mode 100644
index 00000000000..b9096967082
--- /dev/null
+++ b/distribution/conf/tools.yml
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+accessKey: aliyun.com
+secretKey: 12345678
+
diff --git a/example/pom.xml b/example/pom.xml
index 28dfe922fb1..1a4065770b6 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -53,5 +53,10 @@
rocketmq-openmessaging
4.4.0-SNAPSHOT
+
+ org.apache.rocketmq
+ rocketmq-acl
+ 4.4.0-SNAPSHOT
+
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
new file mode 100644
index 00000000000..898051704bf
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+/**
+ *
+ * 1. view the /conf/plain_acl.yml file under the distribution module, pay attention to the accessKey,secretKey,
+ * globalWhiteRemoteAddresses and whiteRemoteAddress and some other attributes.
+ *
+ * 2. Modify ACL_ACCESS_KEY and ACL_SECRET_KEY to the corresponding accessKey and secretKey in plain_acl.yml
+ *
+ */
+public class AclClient {
+
+ private static final Map OFFSE_TABLE = new HashMap();
+
+ private static final String ACL_ACCESS_KEY = "RocketMQ";
+
+ private static final String ACL_SECRET_KEY = "1234567";
+
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ producer();
+ pushConsumer();
+ pullConsumer();
+ }
+
+ public static void producer() throws MQClientException {
+ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
+ producer.setNamesrvAddr("127.0.0.1:9876");
+ producer.start();
+
+ for (int i = 0; i < 128; i++)
+ try {
+ {
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ producer.shutdown();
+ }
+
+ public static void pushConsumer() throws MQClientException {
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.subscribe("TopicTest", "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ //wrong time format 2017_0422_221800
+ consumer.setConsumeTimestamp("20180422221800");
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ printBody(msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+
+ public static void pullConsumer() throws MQClientException {
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAclRPCHook());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.start();
+
+ Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
+ for (MessageQueue mq : mqs) {
+ System.out.printf("Consume from the queue: %s%n", mq);
+ SINGLE_MQ:
+ while (true) {
+ try {
+ PullResult pullResult =
+ consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
+ System.out.printf("%s%n", pullResult);
+ putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+ printBody(pullResult);
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ break;
+ case NO_MATCHED_MSG:
+ break;
+ case NO_NEW_MSG:
+ break SINGLE_MQ;
+ case OFFSET_ILLEGAL:
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ consumer.shutdown();
+ }
+
+ private static void printBody(PullResult pullResult) {
+ printBody(pullResult.getMsgFoundList());
+ }
+
+ private static void printBody(List msg) {
+ if (msg == null || msg.size() == 0)
+ return;
+ for (MessageExt m : msg) {
+ if (m != null) {
+ System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
+ }
+ }
+ }
+
+ private static long getMessageQueueOffset(MessageQueue mq) {
+ Long offset = OFFSE_TABLE.get(mq);
+ if (offset != null)
+ return offset;
+
+ return 0;
+ }
+
+ private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+ OFFSE_TABLE.put(mq, offset);
+ }
+
+ static RPCHook getAclRPCHook() {
+ return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index efffa36d59d..8aec7e30934 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -29,10 +29,10 @@ public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
-
+ consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
- Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
+ Set mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
index 16108b8c6a8..f12595a903b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumerTest.java
@@ -24,6 +24,7 @@
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
+ consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
diff --git a/pom.xml b/pom.xml
index 0a8fef87584..c20b04cb366 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,8 @@
limitations under the License.
-->
-
+
org.apache
@@ -125,6 +126,7 @@
distribution
openmessaging
logging
+ acl
@@ -157,7 +159,7 @@
-
+
true
@@ -214,9 +216,9 @@
generate-effective-dependencies-pom
generate-resources
-
+
@@ -521,6 +523,11 @@
rocketmq-example
${project.version}
+
+ ${project.groupId}
+ rocketmq-acl
+ ${project.version}
+
org.slf4j
slf4j-api
@@ -581,6 +588,16 @@
log4j
1.2.17
+
+ org.yaml
+ snakeyaml
+ 1.19
+
+
+ commons-codec
+ commons-codec
+ 1.9
+
org.apache.logging.log4j
log4j-core
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 8dccebc0457..28ae001b7f3 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -23,6 +23,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -35,6 +36,8 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -45,8 +48,6 @@
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
@@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract {
*/
protected volatile SslContext sslContext;
+ /**
+ * custom rpc hooks
+ */
+ protected List rpcHooks = new ArrayList();
+
+
+
static {
NettyLogger.initNettyLogger();
}
@@ -158,6 +166,23 @@ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand ms
}
}
+ protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
+ if (rpcHooks.size() > 0) {
+ for (RPCHook rpcHook: rpcHooks) {
+ rpcHook.doBeforeRequest(addr, request);
+ }
+ }
+ }
+
+ protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
+ if (rpcHooks.size() > 0) {
+ for (RPCHook rpcHook: rpcHooks) {
+ rpcHook.doAfterResponse(addr, request, response);
+ }
+ }
+ }
+
+
/**
* Process incoming request command issued by remote peer.
*
@@ -174,15 +199,9 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin
@Override
public void run() {
try {
- RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
- if (rpcHook != null) {
- rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
- }
-
+ doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
- if (rpcHook != null) {
- rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
- }
+ doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
@@ -314,12 +333,29 @@ public void run() {
}
}
+
+
/**
* Custom RPC hook.
+ * Just be compatible with the previous version, use getRPCHooks instead.
+ */
+ @Deprecated
+ protected RPCHook getRPCHook() {
+ if (rpcHooks.size() > 0) {
+ return rpcHooks.get(0);
+ }
+ return null;
+ }
+
+ /**
+ * Custom RPC hooks.
*
- * @return RPC hook if specified; null otherwise.
+ * @return RPC hooks if specified; null otherwise.
*/
- public abstract RPCHook getRPCHook();
+ public List getRPCHooks() {
+ return rpcHooks;
+ }
+
/**
* This method specifies thread pool to use while invoking callback methods.
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 33c2eed8de1..fc9df37c652 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -53,6 +53,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -64,8 +66,6 @@
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@@ -94,7 +94,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
- private RPCHook rpcHook;
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
@@ -283,7 +282,9 @@ public void closeChannel(final String addr, final Channel channel) {
@Override
public void registerRPCHook(RPCHook rpcHook) {
- this.rpcHook = rpcHook;
+ if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
+ rpcHooks.add(rpcHook);
+ }
}
public void closeChannel(final Channel channel) {
@@ -357,6 +358,8 @@ public void updateNameServerAddressList(List addrs) {
}
}
+
+
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
@@ -364,17 +367,13 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
- if (this.rpcHook != null) {
- this.rpcHook.doBeforeRequest(addr, request);
- }
+ doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
- if (this.rpcHook != null) {
- this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
- }
+ doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
@@ -522,9 +521,7 @@ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
- if (this.rpcHook != null) {
- this.rpcHook.doBeforeRequest(addr, request);
- }
+ doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
@@ -547,9 +544,7 @@ public void invokeOneway(String addr, RemotingCommand request, long timeoutMilli
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
- if (this.rpcHook != null) {
- this.rpcHook.doBeforeRequest(addr, request);
- }
+ doBeforeRpcHooks(addr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
@@ -592,10 +587,6 @@ public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}
- @Override
- public RPCHook getRPCHook() {
- return this.rpcHook;
- }
@Override
public ExecutorService getCallbackExecutor() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 198484251c0..c2f3ba48d0e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -47,6 +47,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -58,8 +60,6 @@
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
@@ -75,7 +75,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;
- private RPCHook rpcHook;
private int port = 0;
@@ -266,7 +265,9 @@ public void shutdown() {
@Override
public void registerRPCHook(RPCHook rpcHook) {
- this.rpcHook = rpcHook;
+ if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
+ rpcHooks.add(rpcHook);
+ }
}
@Override
@@ -318,10 +319,6 @@ public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}
- @Override
- public RPCHook getRPCHook() {
- return this.rpcHook;
- }
@Override
public ExecutorService getCallbackExecutor() {
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
index 066d36cedd9..ea8047a2ff1 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
@@ -35,8 +35,8 @@ public static Options buildCommandlineOptions(final Options options) {
new Option("n", "namesrvAddr", true,
"Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
opt.setRequired(false);
- options.addOption(opt);
-
+ options.addOption(opt);
+
return options;
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 5027a3cce07..a05a55a06b3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -19,9 +19,13 @@
import java.util.ArrayList;
import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -48,6 +52,7 @@ public class BaseConf {
private static Logger log = Logger.getLogger(BaseConf.class);
static {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
diff --git a/tools/pom.xml b/tools/pom.xml
index dc0e256ed46..a4a8630b138 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -15,7 +15,8 @@
limitations under the License.
-->
-
+
org.apache.rocketmq
rocketmq-all
@@ -36,6 +37,10 @@
${project.groupId}
rocketmq-client
+
+ ${project.groupId}
+ rocketmq-acl
+
${project.groupId}
rocketmq-store
@@ -60,5 +65,9 @@
org.apache.commons
commons-lang3
+
+ org.yaml
+ snakeyaml
+
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 6a51b7b4b92..065e4175df7 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -19,13 +19,16 @@
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
-
import java.util.ArrayList;
import java.util.List;
-
+import com.alibaba.fastjson.JSONObject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclUtils;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
@@ -129,7 +132,7 @@ public static void main0(String[] args, RPCHook rpcHook) {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
- cmd.execute(commandLine, options, rpcHook);
+ cmd.execute(commandLine, options, getAclRPCHook(commandLine));
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
}
@@ -157,7 +160,7 @@ public static void initCommand() {
initCommand(new QueryMsgByKeySubCommand());
initCommand(new QueryMsgByUniqueKeySubCommand());
initCommand(new QueryMsgByOffsetSubCommand());
-
+
initCommand(new PrintMessageSubCommand());
initCommand(new PrintMessageByQueueCommand());
initCommand(new SendMsgStatusCommand());
@@ -211,7 +214,6 @@ private static void initLogback() throws JoranException {
private static void printHelp() {
System.out.printf("The most commonly used mqadmin commands are:%n");
-
for (SubCommand cmd : subCommandList) {
System.out.printf(" %-20s %s%n", cmd.commandName(), cmd.commandDesc());
}
@@ -243,4 +245,25 @@ private static String[] parseSubArgs(String[] args) {
public static void initCommand(SubCommand command) {
subCommandList.add(command);
}
+
+ public static RPCHook getAclRPCHook(CommandLine commandLine) {
+ String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ String fileName = "/conf/tools.yml";
+ JSONObject yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName ,
+ JSONObject.class);
+
+ if (yamlDataObject == null || yamlDataObject.isEmpty()) {
+ System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n" ,fileHome + fileName);
+ return null;
+ }
+ // admin ak sk
+ String accessKey = yamlDataObject.getString("accessKey");
+ String secretKey = yamlDataObject.getString("secretKey");
+
+ if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
+ System.out.printf("AccessKey or secretKey is blank, the acl is not enabled.%n");
+ return null;
+ }
+ return new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
+ }
}