From 2e3787c43d27a93baf5c7262551f243cd8ca3d43 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 29 Sep 2022 10:08:02 +0800 Subject: [PATCH] HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803) Signed-off-by: Xin Sun --- .../hbase/procedure2/LockedResourceType.java | 3 +- .../procedure/GlobalProcedureInterface.java | 29 +++++ .../hbase/master/procedure/GlobalQueue.java | 35 ++++++ .../procedure/MasterProcedureScheduler.java | 119 +++++++++++++++++- .../hbase/master/procedure/SchemaLocking.java | 18 ++- .../TestMasterProcedureScheduler.java | 48 +++++++ 6 files changed, 246 insertions(+), 6 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java index 12f899d7565b..401410170097 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java @@ -26,5 +26,6 @@ public enum LockedResourceType { TABLE, REGION, PEER, - META + META, + GLOBAL } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java new file mode 100644 index 000000000000..1ef168abfd8f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Procedure interface for global operations, such as migration. + */ +@InterfaceAudience.Private +public interface GlobalProcedureInterface { + + String getGlobalId(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java new file mode 100644 index 000000000000..1633dc4856e7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class GlobalQueue extends Queue { + + public GlobalQueue(String globalId, LockStatus lockStatus) { + super(globalId, lockStatus); + } + + @Override + boolean requireExclusiveLock(Procedure proc) { + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 866f2f6f4032..fbf0eb8abf32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; @@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { (n, k) -> n.compareKey((String) k); private final static AvlKeyComparator META_QUEUE_KEY_COMPARATOR = (n, k) -> n.compareKey((TableName) k); + private final static AvlKeyComparator GLOBAL_QUEUE_KEY_COMPARATOR = + (n, k) -> n.compareKey((String) k); private final FairQueue serverRunQueue = new FairQueue<>(); private final FairQueue tableRunQueue = new FairQueue<>(); private final FairQueue peerRunQueue = new FairQueue<>(); private final FairQueue metaRunQueue = new FairQueue<>(); + private final FairQueue globalRunQueue = new FairQueue<>(); private final ServerQueue[] serverBuckets = new ServerQueue[128]; private TableQueue tableMap = null; private PeerQueue peerMap = null; private MetaQueue metaMap = null; + private GlobalQueue globalMap = null; private final SchemaLocking locking; @@ -128,6 +133,8 @@ protected void enqueue(final Procedure proc, final boolean addFront) { doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); } else if (isPeerProcedure(proc)) { doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); + } else if (isGlobalProcedure(proc)) { + doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront); } else { // TODO: at the moment we only have Table and Server procedures // if you are implementing a non-table/non-server procedure, you have two options: create @@ -163,14 +170,19 @@ private > void doAdd(FairQueue fairq, Queue queue, @Override protected boolean queueHasRunnables() { - return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() - || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); + return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables() + || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() + || peerRunQueue.hasRunnables(); } @Override protected Procedure dequeue() { - // meta procedure is always the first priority - Procedure pollResult = doPoll(metaRunQueue); + // pull global first + Procedure pollResult = doPoll(globalRunQueue); + // then meta procedure + if (pollResult == null) { + pollResult = doPoll(metaRunQueue); + } // For now, let server handling have precedence over table handling; presumption is that it // is more important handling crashed servers than it is running the // enabling/disabling tables, etc. @@ -268,6 +280,14 @@ private void clearQueue() { clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); peerMap = null; + // Remove Meta + clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR); + metaMap = null; + + // Remove Global + clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR); + globalMap = null; + assert size() == 0 : "expected queue size to be 0, got " + size(); } @@ -300,6 +320,7 @@ protected int queueSize() { count += queueSize(tableMap); count += queueSize(peerMap); count += queueSize(metaMap); + count += queueSize(globalMap); return count; } @@ -502,6 +523,51 @@ private static boolean isMetaProcedure(Procedure proc) { return proc instanceof MetaProcedureInterface; } + // ============================================================================ + // Global Queue Lookup Helpers + // ============================================================================ + private GlobalQueue getGlobalQueue(String globalId) { + GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); + if (node != null) { + return node; + } + node = new GlobalQueue(globalId, locking.getGlobalLock(globalId)); + globalMap = AvlTree.insert(globalMap, node); + return node; + } + + private void removeGlobalQueue(String globalId) { + globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); + locking.removeGlobalLock(globalId); + } + + private void tryCleanupGlobalQueue(String globalId, Procedure procedure) { + schedLock(); + try { + GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); + if (queue == null) { + return; + } + + final LockAndQueue lock = locking.getGlobalLock(globalId); + if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(globalRunQueue, queue, + () -> "clean up global queue after " + procedure + " completed"); + removeGlobalQueue(globalId); + } + } finally { + schedUnlock(); + } + } + + private static boolean isGlobalProcedure(Procedure proc) { + return proc instanceof GlobalProcedureInterface; + } + + private static String getGlobalId(Procedure proc) { + return ((GlobalProcedureInterface) proc).getGlobalId(); + } + // ============================================================================ // Table Locking Helpers // ============================================================================ @@ -1006,6 +1072,51 @@ public void wakeMetaExclusiveLock(Procedure procedure) { } } + // ============================================================================ + // Global Locking Helpers + // ============================================================================ + /** + * Try to acquire the share lock on global. + * @see #wakeGlobalExclusiveLock(Procedure, String) + * @param procedure the procedure trying to acquire the lock + * @return true if the procedure has to wait for global to be available + */ + public boolean waitGlobalExclusiveLock(Procedure procedure, String globalId) { + schedLock(); + try { + final LockAndQueue lock = locking.getGlobalLock(globalId); + if (lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId), + () -> procedure + " held shared lock"); + return false; + } + waitProcedure(lock, procedure); + logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING); + return true; + } finally { + schedUnlock(); + } + } + + /** + * Wake the procedures waiting for global. + * @see #waitGlobalExclusiveLock(Procedure, String) + * @param procedure the procedure releasing the lock + */ + public void wakeGlobalExclusiveLock(Procedure procedure, String globalId) { + schedLock(); + try { + final LockAndQueue lock = locking.getGlobalLock(globalId); + lock.releaseExclusiveLock(procedure); + addToRunQueue(globalRunQueue, getGlobalQueue(globalId), + () -> procedure + " released shared lock"); + int waitingCount = wakeWaitingProcedures(lock); + wakePollIfNeeded(waitingCount); + } finally { + schedUnlock(); + } + } + /** * For debugging. Expensive. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index 13419ac455ca..853d13b0c93b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -53,6 +53,7 @@ class SchemaLocking { // Single map for all regions irrespective of tables. Key is encoded region name. private final Map regionLocks = new HashMap<>(); private final Map peerLocks = new HashMap<>(); + private final Map globalLocks = new HashMap<>(); private final LockAndQueue metaLock; public SchemaLocking(Function> procedureRetriever) { @@ -94,6 +95,10 @@ LockAndQueue getMetaLock() { return metaLock; } + LockAndQueue getGlobalLock(String globalId) { + return getLock(globalLocks, globalId); + } + LockAndQueue removeRegionLock(String encodedRegionName) { return regionLocks.remove(encodedRegionName); } @@ -114,6 +119,10 @@ LockAndQueue removePeerLock(String peerId) { return peerLocks.remove(peerId); } + LockAndQueue removeGlobalLock(String globalId) { + return globalLocks.remove(globalId); + } + private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName, LockAndQueue queue) { LockType lockType; @@ -164,6 +173,8 @@ List getLocks() { addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER); addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock), tn -> tn.getNameAsString(), LockedResourceType.META); + addToLockedResources(lockedResources, globalLocks, Function.identity(), + LockedResourceType.GLOBAL); return lockedResources; } @@ -191,6 +202,10 @@ LockedResource getLockResource(LockedResourceType resourceType, String resourceN break; case META: queue = metaLock; + break; + case GLOBAL: + queue = globalLocks.get(resourceName); + break; default: queue = null; break; @@ -216,7 +231,8 @@ public String toString() { + filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) + ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks=" + filterUnlocked(this.peerLocks) + ", metaLocks=" - + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)); + + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)) + ", globalLocks=" + + filterUnlocked(globalLocks); } private String filterUnlocked(Map locks) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index f0edf73715ea..0cf34126a945 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -940,6 +940,21 @@ public PeerOperationType getPeerOperationType() { } } + public static class TestGlobalProcedure extends TestProcedure + implements GlobalProcedureInterface { + private final String globalId; + + public TestGlobalProcedure(long procId, String globalId) { + super(procId); + this.globalId = globalId; + } + + @Override + public String getGlobalId() { + return globalId; + } + } + private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception { LockProcedure procedure = new LockProcedure(); @@ -1093,6 +1108,39 @@ public void testListLocksPeer() throws Exception { assertEquals(1, resource.getWaitingProcedures().size()); } + @Test + public void testListLocksGlobal() throws Exception { + String globalId = "1"; + LockProcedure procedure = createExclusiveLockProcedure(4); + queue.waitGlobalExclusiveLock(procedure, globalId); + + List locks = queue.getLocks(); + assertEquals(1, locks.size()); + + LockedResource resource = locks.get(0); + assertLockResource(resource, LockedResourceType.GLOBAL, globalId); + assertExclusiveLock(resource, procedure); + assertTrue(resource.getWaitingProcedures().isEmpty()); + + // Try to acquire the exclusive lock again with same procedure + assertFalse(queue.waitGlobalExclusiveLock(procedure, globalId)); + + // Try to acquire the exclusive lock again with new procedure + LockProcedure procedure2 = createExclusiveLockProcedure(5); + assertTrue(queue.waitGlobalExclusiveLock(procedure2, globalId)); + + // Same peerId, still only has 1 LockedResource + locks = queue.getLocks(); + assertEquals(1, locks.size()); + + resource = locks.get(0); + assertLockResource(resource, LockedResourceType.GLOBAL, globalId); + // LockedResource owner still is the origin procedure + assertExclusiveLock(resource, procedure); + // The new procedure should in the waiting list + assertEquals(1, resource.getWaitingProcedures().size()); + } + @Test public void testListLocksWaiting() throws Exception { LockProcedure procedure1 = createExclusiveLockProcedure(1);