From 3ab2330d829a47a87d95bbdf00f93a6b618abfde Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 15 May 2019 19:18:43 +0800 Subject: [PATCH] some rename --- ... => SnapshotScannerHDFSAclController.java} | 81 +++++++++++-------- ...java => SnapshotScannerHDFSAclHelper.java} | 55 +++++++------ ...TestSnapshotScannerHDFSAclController.java} | 34 +++++--- 3 files changed, 97 insertions(+), 73 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/{HDFSAclController.java => SnapshotScannerHDFSAclController.java} (86%) rename hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/{HDFSAclHelper.java => SnapshotScannerHDFSAclHelper.java} (93%) rename hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/{TestHDFSAclController.java => TestSnapshotScannerHDFSAclController.java} (95%) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HDFSAclController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java similarity index 86% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HDFSAclController.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java index fa9271465650..f2967270dc6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HDFSAclController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; @@ -56,8 +57,8 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.security.access.HDFSAclHelper.PathHelper; import org.apache.hadoop.hbase.security.access.Permission.Action; +import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper.PathHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -94,10 +95,10 @@ */ @CoreCoprocessor @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class HDFSAclController implements MasterCoprocessor, MasterObserver { - private static final Logger LOG = LoggerFactory.getLogger(HDFSAclController.class); +public class SnapshotScannerHDFSAclController implements MasterCoprocessor, MasterObserver { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclController.class); - private HDFSAclHelper hdfsAclHelper = null; + private SnapshotScannerHDFSAclHelper hdfsAclHelper = null; private PathHelper pathHelper = null; private FileSystem fs = null; /** Provider for mapping principal names to Users */ @@ -117,8 +118,8 @@ public void preMasterInitialization(final ObserverContext ctx) if (admin.tableExists(PermissionStorage.ACL_TABLE_NAME)) { // check if hbase:acl table has 'm' CF TableDescriptor tableDescriptor = admin.getDescriptor(PermissionStorage.ACL_TABLE_NAME); - boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies()) - .anyMatch(family -> Bytes.equals(family.getName(), HDFSAclStorage.HDFS_ACL_FAMILY)); + boolean containHdfsAclFamily = Arrays.stream(tableDescriptor.getColumnFamilies()).anyMatch( + family -> Bytes.equals(family.getName(), SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY)); if (!containHdfsAclFamily) { - TableDescriptorBuilder builder = - TableDescriptorBuilder.newBuilder(tableDescriptor).setColumnFamily( - ColumnFamilyDescriptorBuilder.newBuilder(HDFSAclStorage.HDFS_ACL_FAMILY).build()); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor) + .setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(SnapshotScannerHDFSAclStorage.HDFS_ACL_FAMILY).build()); admin.modifyTable(builder.build()); } + } else { + LOG.error("Table {} is not yet created. {} should be configured after the {} Coprocessor", + PermissionStorage.ACL_TABLE_NAME, getClass().getSimpleName(), + AccessController.class.getSimpleName()); + throw new TableNotFoundException( + "Table " + PermissionStorage.ACL_TABLE_NAME + " is not yet created"); } } this.userProvider = UserProvider.instantiate(ctx.getEnvironment().getConfiguration()); @@ -171,7 +178,7 @@ public void postCompletedCreateTableAction(ObserverContext ctx, Set removeUsers = new HashSet<>(); try (Table aclTable = ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) { - List users = HDFSAclStorage.getTableUsers(aclTable, tableName); - HDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName); + List users = SnapshotScannerHDFSAclStorage.getTableUsers(aclTable, tableName); + SnapshotScannerHDFSAclStorage.deleteTableHdfsAcl(aclTable, tableName); for (String user : users) { - List userEntries = HDFSAclStorage.getUserEntries(aclTable, user); + List userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, user); boolean remove = true; for (byte[] entry : userEntries) { if (PermissionStorage.isGlobalEntry(entry)) { @@ -253,7 +260,7 @@ public void postDeleteNamespace(ObserverContext ct if (hdfsAclHelper != null) { try (Table aclTable = ctx.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) { - HDFSAclStorage.deleteNamespaceHdfsAcl(aclTable, namespace); + SnapshotScannerHDFSAclStorage.deleteNamespaceHdfsAcl(aclTable, namespace); } Path tmpNsDir = pathHelper.getTmpNsDir(namespace); if (fs.exists(tmpNsDir)) { @@ -279,7 +286,8 @@ public void postGrant(ObserverContext c, case GLOBAL: UserPermission perm = getUserGlobalPermission(conf, userName); if (containReadPermission(perm)) { - List userEntries = HDFSAclStorage.getUserEntries(aclTable, userName); + List userEntries = + SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName); Set skipNamespaces = new HashSet<>(); Set skipTables = new HashSet<>(); for (byte[] entry : userEntries) { @@ -290,7 +298,7 @@ public void postGrant(ObserverContext c, } } hdfsAclHelper.grantAcl(userPermission, skipNamespaces, skipTables); - HDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName); + SnapshotScannerHDFSAclStorage.addUserGlobalHdfsAcl(aclTable, userName); } else { revokeUserGlobalPermission(aclTable, userName, userPermission); } @@ -300,8 +308,9 @@ public void postGrant(ObserverContext c, ((NamespacePermission) userPermission.getPermission()).getNamespace(); UserPermission nsPerm = getUserNamespacePermission(conf, userName, namespace); if (containReadPermission(nsPerm)) { - if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) { - List userEntries = HDFSAclStorage.getUserEntries(aclTable, userName); + if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) { + List userEntries = + SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName); Set skipTables = new HashSet<>(); for (byte[] entry : userEntries) { if (!PermissionStorage.isNamespaceEntry(entry) @@ -311,7 +320,7 @@ public void postGrant(ObserverContext c, } hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), skipTables); } - HDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace); + SnapshotScannerHDFSAclStorage.addUserNamespaceHdfsAcl(aclTable, userName, namespace); } else { revokeUserNamespacePermission(aclTable, userName, namespace, userPermission); } @@ -324,11 +333,12 @@ public void postGrant(ObserverContext c, break; } if (containReadPermission(tPerm)) { - if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName) && !HDFSAclStorage - .hasUserNamespaceHdfsAcl(aclTable, userName, tableName.getNamespaceAsString())) { + if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName) + && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, + tableName.getNamespaceAsString())) { hdfsAclHelper.grantAcl(userPermission, new HashSet<>(0), new HashSet<>(0)); } - HDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName); + SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(aclTable, userName, tableName); } else { revokeUserTablePermission(aclTable, userName, tableName, userPermission); } @@ -378,7 +388,7 @@ private void revokeUserGlobalPermission(Table aclTable, String userName, // remove user global acls but reserve ns and table acls Set skipNamespaces = new HashSet<>(); Set skipTables = new HashSet<>(); - List userEntries = HDFSAclStorage.getUserEntries(aclTable, userName); + List userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName); for (byte[] entry : userEntries) { if (PermissionStorage.isNamespaceEntry(entry)) { skipNamespaces.add(Bytes.toString(PermissionStorage.fromNamespaceEntry(entry))); @@ -393,15 +403,15 @@ private void revokeUserGlobalPermission(Table aclTable, String userName, } } hdfsAclHelper.revokeAcl(userPermission, skipNamespaces, filterTableNames); - HDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName); + SnapshotScannerHDFSAclStorage.deleteUserGlobalHdfsAcl(aclTable, userName); } private void revokeUserNamespacePermission(Table aclTable, String userName, String namespace, UserPermission userPermission) throws IOException { // remove user ns acls but reserve table acls - if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) { + if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName)) { Set skipTables = new HashSet<>(); - List userEntries = HDFSAclStorage.getUserEntries(aclTable, userName); + List userEntries = SnapshotScannerHDFSAclStorage.getUserEntries(aclTable, userName); for (byte[] entry : userEntries) { if (!PermissionStorage.isNamespaceEntry(entry) && !PermissionStorage.isGlobalEntry(entry)) { skipTables.add(TableName.valueOf(entry)); @@ -409,17 +419,18 @@ private void revokeUserNamespacePermission(Table aclTable, String userName, Stri } hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(), skipTables); } - HDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace); + SnapshotScannerHDFSAclStorage.deleteUserNamespaceHdfsAcl(aclTable, userName, namespace); } private void revokeUserTablePermission(Table aclTable, String userName, TableName tableName, UserPermission userPermission) throws IOException { - if (!HDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName) && !HDFSAclStorage - .hasUserNamespaceHdfsAcl(aclTable, userName, tableName.getNamespaceAsString())) { + if (!SnapshotScannerHDFSAclStorage.hasUserGlobalHdfsAcl(aclTable, userName) + && !SnapshotScannerHDFSAclStorage.hasUserNamespaceHdfsAcl(aclTable, userName, + tableName.getNamespaceAsString())) { // remove table acls hdfsAclHelper.revokeAcl(userPermission, new HashSet<>(0), new HashSet<>(0)); } - HDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName); + SnapshotScannerHDFSAclStorage.deleteUserTableHdfsAcl(aclTable, userName, tableName); } private boolean containReadPermission(UserPermission userPermission) { @@ -461,11 +472,11 @@ private UserPermission getUserTablePermission(Configuration conf, String userNam } private boolean isHdfsAclEnabled(Configuration configuration) { - return configuration.getBoolean(HDFSAclHelper.HDFS_ACL_ENABLE, false); + return configuration.getBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, false); } - protected static final class HDFSAclStorage { - public static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m"); + protected static final class SnapshotScannerHDFSAclStorage { + static final byte[] HDFS_ACL_FAMILY = Bytes.toBytes("m"); private static final byte[] HDFS_ACL_VALUE = Bytes.toBytes("R"); static void addUserGlobalHdfsAcl(Table aclTable, String user) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HDFSAclHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java similarity index 93% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HDFSAclHelper.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java index cadf3728145c..5bacb6cc2df3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HDFSAclHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS; import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; +import static org.apache.hadoop.fs.permission.AclEntryType.GROUP; import static org.apache.hadoop.fs.permission.AclEntryType.USER; import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE; import static org.apache.hadoop.hbase.security.access.Permission.Action.READ; @@ -44,6 +45,7 @@ import org.apache.hadoop.fs.permission.AclEntryScope; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -64,21 +66,24 @@ * A helper to set HBase granted user access acl and default acl over hFiles. */ @InterfaceAudience.Private -public class HDFSAclHelper implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(HDFSAclHelper.class); +public class SnapshotScannerHDFSAclHelper implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotScannerHDFSAclHelper.class); - public static final String HDFS_ACL_ENABLE = "hbase.hdfs.acl.enable"; - public static final String HDFS_ACL_THREAD_NUMBER = "hbase.hdfs.acl.thread.number"; + public static final String USER_SCAN_SNAPSHOT_ENABLE = "hbase.user.scan.snapshot.enable"; + public static final String USER_SCAN_SNAPSHOT_THREAD_NUMBER = + "hbase.user.scan.snapshot.thread.number"; // the tmp directory to restore snapshot, it can not be a sub directory of HBase root dir public static final String SNAPSHOT_RESTORE_TMP_DIR = "hbase.snapshot.restore.tmp.dir"; public static final String SNAPSHOT_RESTORE_TMP_DIR_DEFAULT = "/hbase/.tmpdir-to-restore-snapshot"; // If enable this feature, set public directories permission to 751 - public static final FsPermission ACL_ENABLE_PUBLIC_HFILE_PERMISSION = - new FsPermission((short) 0751); + public static final String COMMON_DIRECTORY_PERMISSION = + "hbase.user.scan.snapshot.common.directory.permission"; + public static final String COMMON_DIRECTORY_PERMISSION_DEFAULT = "751"; // If enable this feature, set restore directory permission to 703 - public static final FsPermission ACL_ENABLE_RESTORE_HFILE_PERMISSION = - new FsPermission((short) 0703); + public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION = + "hbase.user.scan.snapshot.restore.directory.permission"; + public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT = "703"; private Admin admin; private final Configuration conf; @@ -86,13 +91,13 @@ public class HDFSAclHelper implements Closeable { private PathHelper pathHelper; private ExecutorService pool; - public HDFSAclHelper(Configuration configuration, Connection connection) + public SnapshotScannerHDFSAclHelper(Configuration configuration, Connection connection) throws IOException { this.conf = configuration; this.pathHelper = new PathHelper(conf); this.fs = pathHelper.getFileSystem(); this.pathHelper = new PathHelper(conf); - this.pool = Executors.newFixedThreadPool(conf.getInt(HDFS_ACL_THREAD_NUMBER, 10), + this.pool = Executors.newFixedThreadPool(conf.getInt(USER_SCAN_SNAPSHOT_THREAD_NUMBER, 10), new ThreadFactoryBuilder().setNameFormat("hdfs-acl-thread-%d").setDaemon(true).build()); if (connection == null) { connection = ConnectionFactory.createConnection(conf); @@ -122,14 +127,16 @@ public void setCommonDirPermission() throws IOException { if (!fs.exists(path)) { fs.mkdirs(path); } - fs.setPermission(path, ACL_ENABLE_PUBLIC_HFILE_PERMISSION); + fs.setPermission(path, new FsPermission( + conf.get(COMMON_DIRECTORY_PERMISSION, COMMON_DIRECTORY_PERMISSION_DEFAULT))); } // create snapshot restore directory Path restoreDir = new Path(conf.get(SNAPSHOT_RESTORE_TMP_DIR, SNAPSHOT_RESTORE_TMP_DIR_DEFAULT)); if (!fs.exists(restoreDir)) { fs.mkdirs(restoreDir); - fs.setPermission(restoreDir, ACL_ENABLE_RESTORE_HFILE_PERMISSION); + fs.setPermission(restoreDir, new FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION, + SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT))); } } @@ -235,10 +242,9 @@ public boolean resetTableAcl(TableName tableName) { public boolean removeNamespaceAcl(TableName tableName, Set removeUsers) { try { long start = System.currentTimeMillis(); - List aclEntries = removeUsers.stream() - .map(removeUser -> new AclEntry.Builder().setScope(AclEntryScope.ACCESS).setType(USER) - .setName(removeUser).setPermission(FsAction.READ_EXECUTE).build()) - .collect(Collectors.toList()); + List aclEntries = + removeUsers.stream().map(removeUser -> aclEntry(ACCESS, removeUser, READ_EXECUTE)) + .collect(Collectors.toList()); Path nsPath = pathHelper.getDataNsDir(tableName.getNamespaceAsString()); fs.removeAclEntries(nsPath, aclEntries); LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName, @@ -259,11 +265,9 @@ public void addTableAcl(TableName tableName, String user) { try { long start = System.currentTimeMillis(); List aclEntries = new ArrayList<>(2); - AclEntry accessAclEntry = new AclEntry.Builder().setScope(AclEntryScope.ACCESS).setType(USER) - .setName(user).setPermission(FsAction.READ_EXECUTE).build(); + AclEntry accessAclEntry = aclEntry(ACCESS, user, READ_EXECUTE); aclEntries.add(accessAclEntry); - aclEntries.add(new AclEntry.Builder().setScope(AclEntryScope.DEFAULT).setType(USER) - .setName(user).setPermission(FsAction.READ_EXECUTE).build()); + aclEntries.add(aclEntry(DEFAULT, user, READ_EXECUTE)); // set access and default HDFS acl for table dir fs.modifyAclEntries(pathHelper.getTmpTableDir(tableName), aclEntries); fs.modifyAclEntries(pathHelper.getArchiveTableDir(tableName), aclEntries); @@ -540,6 +544,12 @@ private CompletableFuture setHDFSAclParallel(List operat return future; } + private static AclEntry aclEntry(AclEntryScope scope, String name, FsAction action) { + return new AclEntry.Builder().setScope(scope) + .setType(AuthUtil.isGroupPrincipal(name) ? GROUP : USER).setName(name).setPermission(action) + .build(); + } + /** * Inner class used to describe modify or remove what acl entries for files or directories(and * child files) @@ -629,11 +639,6 @@ private List getDefaultAclEntries(Set users, FsAction action) } return dirAclList; } - - private AclEntry aclEntry(AclEntryScope scope, String name, FsAction action) { - return new AclEntry.Builder().setScope(scope).setType(USER).setName(name) - .setPermission(action).build(); - } } protected static final class PathHelper { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestHDFSAclController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java similarity index 95% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestHDFSAclController.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java index 6b2012038c43..4ad4bce153ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestHDFSAclController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestSnapshotScannerHDFSAclController.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -60,13 +61,14 @@ import org.slf4j.LoggerFactory; @Category({ SecurityTests.class, LargeTests.class }) -public class TestHDFSAclController { +public class TestSnapshotScannerHDFSAclController { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHDFSAclController.class); + HBaseClassTestRule.forClass(TestSnapshotScannerHDFSAclController.class); @Rule public TestName name = new TestName(); - private static final Logger LOG = LoggerFactory.getLogger(TestHDFSAclController.class); + private static final Logger LOG = + LoggerFactory.getLogger(TestSnapshotScannerHDFSAclController.class); private static final String UN_GRANT_USER = "un_grant_user"; private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -82,16 +84,16 @@ public static void setupBeforeClass() throws Exception { conf.setBoolean("dfs.namenode.acls.enabled", true); conf.set("fs.permissions.umask-mode", "027"); // enable hbase hdfs acl feature - conf.setBoolean(HDFSAclHelper.HDFS_ACL_ENABLE, true); + conf.setBoolean(SnapshotScannerHDFSAclHelper.USER_SCAN_SNAPSHOT_ENABLE, true); // enable secure conf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); - conf.set(HDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR, - HDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT); + conf.set(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR, + SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT); SecureTestUtil.enableSecurity(conf); - // add HDFSAclController coprocessor + // add SnapshotScannerHDFSAclController coprocessor conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) + "," - + HDFSAclController.class.getName()); + + SnapshotScannerHDFSAclController.class.getName()); TEST_UTIL.startMiniCluster(); admin = TEST_UTIL.getAdmin(); @@ -100,20 +102,26 @@ public static void setupBeforeClass() throws Exception { unGrantUser = User.createUserForTesting(conf, UN_GRANT_USER, new String[] {}); // set hbase directory permission + FsPermission commonDirectoryPermission = + new FsPermission(conf.get(SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION, + SnapshotScannerHDFSAclHelper.COMMON_DIRECTORY_PERMISSION_DEFAULT)); Path path = rootDir; while (path != null) { - fs.setPermission(path, HDFSAclHelper.ACL_ENABLE_PUBLIC_HFILE_PERMISSION); + fs.setPermission(path, commonDirectoryPermission); path = path.getParent(); } // set restore directory permission - Path restoreDir = new Path(HDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT); + Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT); if (!fs.exists(restoreDir)) { fs.mkdirs(restoreDir); - fs.setPermission(restoreDir, HDFSAclHelper.ACL_ENABLE_RESTORE_HFILE_PERMISSION); + fs.setPermission(restoreDir, + new FsPermission( + conf.get(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION, + SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT))); } path = restoreDir.getParent(); while (path != null) { - fs.setPermission(path, HDFSAclHelper.ACL_ENABLE_PUBLIC_HFILE_PERMISSION); + fs.setPermission(path, commonDirectoryPermission); path = path.getParent(); } @@ -641,7 +649,7 @@ private static PrivilegedExceptionAction getScanSnapshotAction(Configurati String snapshotName, long expectedRowCount) { PrivilegedExceptionAction action = () -> { try { - Path restoreDir = new Path(HDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT); + Path restoreDir = new Path(SnapshotScannerHDFSAclHelper.SNAPSHOT_RESTORE_TMP_DIR_DEFAULT); Scan scan = new Scan(); TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);