diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 9cee166f7768..b78ce0fd967c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1503,6 +1503,33 @@ public enum OperationStatusCode { public static final String SNAPSHOT_CLEANER_DISABLE = "hbase.master.cleaner.snapshot.disable"; + /** + * Configurations for master executor services. + */ + public static final String MASTER_OPEN_REGION_THREADS = + "hbase.master.executor.openregion.threads"; + public static final int MASTER_OPEN_REGION_THREADS_DEFAULT = 5; + + public static final String MASTER_CLOSE_REGION_THREADS = + "hbase.master.executor.closeregion.threads"; + public static final int MASTER_CLOSE_REGION_THREADS_DEFAULT = 5; + + public static final String MASTER_SERVER_OPERATIONS_THREADS = + "hbase.master.executor.serverops.threads"; + public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5; + + public static final String MASTER_META_SERVER_OPERATIONS_THREADS = + "hbase.master.executor.meta.serverops.threads"; + public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5; + + public static final String MASTER_LOG_REPLAY_OPS_THREADS = + "hbase.master.executor.logreplayops.threads"; + public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; + + public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS = + "hbase.master.executor.snapshot.threads"; + public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3; + private HConstants() { // Can't be instantiated with this ctor. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 1ae9db266adb..80c2717f0a38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -206,13 +206,13 @@ public enum EventType { * C_M_SNAPSHOT_TABLE
* Client asking Master to snapshot an offline table. */ - C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS), + C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_SNAPSHOT_OPERATIONS), /** * Messages originating from Client to Master.
* C_M_RESTORE_SNAPSHOT
* Client asking Master to restore a snapshot. */ - C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS), + C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_SNAPSHOT_OPERATIONS), // Updates from master to ZK. This is done by the master and there is // nothing to process by either Master or RS @@ -314,11 +314,6 @@ public static EventType get(final int code) { throw new IllegalArgumentException("Unknown code " + code); } - public boolean isOnlineSchemaChangeSupported() { - return this.equals(EventType.C_M_ADD_FAMILY) || this.equals(EventType.C_M_DELETE_FAMILY) || - this.equals(EventType.C_M_MODIFY_FAMILY) || this.equals(EventType.C_M_MODIFY_TABLE); - } - ExecutorType getExecutorServiceType() { return this.executor; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 596385d0caa0..66baccb95fd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -34,6 +34,7 @@ public enum ExecutorType { MASTER_RS_SHUTDOWN (5), MASTER_META_SERVER_OPERATIONS (6), M_LOG_REPLAY_OPS (7), + MASTER_SNAPSHOT_OPERATIONS (8), // RegionServer executor services RS_OPEN_REGION (20), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0d379438a8af..10a4dd8370ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1396,18 +1396,23 @@ public TableStateManager getTableStateManager() { * as OOMEs; it should be lightly loaded. See what HRegionServer does if * need to install an unexpected exception handler. */ - private void startServiceThreads() throws IOException{ - // Start the executor service pools - this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, - conf.getInt("hbase.master.executor.openregion.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, - conf.getInt("hbase.master.executor.closeregion.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.serverops.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, - conf.getInt("hbase.master.executor.meta.serverops.threads", 5)); - this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, - conf.getInt("hbase.master.executor.logreplayops.threads", 10)); + private void startServiceThreads() throws IOException { + // Start the executor service pools + this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt( + HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt( + HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, + conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS, + HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, + conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS, + HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt( + HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT)); + this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, + conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS, + HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT)); // We depend on there being only one instance of this executor running // at a time. To do concurrency, would need fencing of enable/disable of diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java index f6e9409a5156..205c6c6f0900 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.StringWriter; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -219,5 +221,36 @@ public boolean evaluate() throws Exception { executorService.shutdown(); } + @Test + public void testSnapshotHandlers() throws Exception { + final Configuration conf = HBaseConfiguration.create(); + final Server server = mock(Server.class); + when(server.getConfiguration()).thenReturn(conf); + + ExecutorService executorService = new ExecutorService("testSnapshotHandlers"); + executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1); + + CountDownLatch latch = new CountDownLatch(1); + executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) { + @Override + public void process() throws IOException { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + + int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) + .getThreadPoolExecutor().getActiveCount(); + Assert.assertEquals(activeCount, 1); + latch.countDown(); + Waiter.waitFor(conf, 3000, () -> { + int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS) + .getThreadPoolExecutor().getActiveCount(); + return count == 0; + }); + } }