From 4ff9ac4a8dae4c7a9a67ffffb45b57fd550d3c69 Mon Sep 17 00:00:00 2001 From: yuqi Date: Wed, 10 Jul 2024 20:56:38 +0800 Subject: [PATCH] Support hold multiple tree lock within a thread at the same time --- .../gravitino/lock/LockManager.java | 6 +- .../datastrato/gravitino/lock/TreeLock.java | 21 ++++- .../gravitino/lock/TreeLockNode.java | 82 +++++++++++++------ .../gravitino/lock/TestTreeLockUtils.java | 51 ++++++++++++ 4 files changed, 131 insertions(+), 29 deletions(-) create mode 100644 core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java diff --git a/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java index b1dbb27fed2..9fb0ef6e1b2 100644 --- a/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java +++ b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java @@ -132,12 +132,12 @@ void checkDeadLock(TreeLockNode node) { // Check self node.getHoldingThreadTimestamp() .forEach( - (thread, ts) -> { + (threadIdentifier, ts) -> { // If the thread is holding the lock for more than 30 seconds, we will log it. if (System.currentTimeMillis() - ts > 30000) { LOG.warn( - "Dead lock detected for thread {} on node {}, threads that holding the node: {} ", - thread, + "Dead lock detected for thread with identifier {} on node {}, threads that holding the node: {} ", + threadIdentifier, node, node.getHoldingThreadTimestamp()); } diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java index 76d9ab02887..02cb0c757b2 100644 --- a/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java +++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java @@ -104,8 +104,17 @@ public void lock(LockType lockType) { try { treeLockNode.lock(type); heldLocks.push(Pair.of(treeLockNode, type)); + + treeLockNode.addHoldingThreadTimestamp( + Thread.currentThread(), identifier, System.currentTimeMillis()); if (LOG.isTraceEnabled()) { - LOG.trace("Locked node: {}, lock type: {}", treeLockNode, type); + LOG.trace( + "Node {} has been lock with '{}' lock, hold by {} with ident '{}' at {}", + this, + lockType, + Thread.currentThread(), + identifier, + System.currentTimeMillis()); } } catch (Exception e) { LOG.error( @@ -140,8 +149,16 @@ public void unlock() { TreeLockNode current = pair.getLeft(); LockType type = pair.getRight(); current.unlock(type); + + long holdStartTime = current.removeHoldingThreadTimestamp(Thread.currentThread(), identifier); if (LOG.isTraceEnabled()) { - LOG.trace("Unlocked node: {}, lock type: {}", current, type); + LOG.trace( + "Node {} has been unlock with '{}' lock, hold by {} with ident '{}' for {} ms", + this, + lockType, + Thread.currentThread(), + identifier, + System.currentTimeMillis() - holdStartTime); } } diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java index a4953c54104..92db979aa91 100644 --- a/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java +++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java @@ -19,6 +19,7 @@ package com.datastrato.gravitino.lock; +import com.datastrato.gravitino.NameIdentifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -44,13 +45,60 @@ public class TreeLockNode { private final String name; private final ReentrantReadWriteLock readWriteLock; @VisibleForTesting final Map childMap; - private final Map holdingThreadTimestamp = new ConcurrentHashMap<>(); + + private final Map holdingThreadTimestamp = new ConcurrentHashMap<>(); // The reference count of this node. The reference count is used to track the number of the // TreeLocks that are using this node. If the reference count is 0, it means that no TreeLock is // using this node, and this node can be removed from the tree. private final AtomicLong referenceCount = new AtomicLong(); + /** + * The identifier of a thread. This class is used to identify this tree lock node is held by which + * thread and identifier because one thread can hold multiple tree lock nodes at the same time. + * For example, a thread can hold the lock of the root node and the lock of the child node at the + * same time. + */ + static class ThreadIdentifier { + private final Thread thread; + private final NameIdentifier ident; + + public ThreadIdentifier(Thread thread, NameIdentifier ident) { + this.thread = thread; + this.ident = ident; + } + + static ThreadIdentifier of(Thread thread, NameIdentifier identifier) { + return new ThreadIdentifier(thread, identifier); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof ThreadIdentifier)) { + return false; + } + ThreadIdentifier that = (ThreadIdentifier) o; + return Objects.equal(thread, that.thread) && Objects.equal(ident, that.ident); + } + + @Override + public int hashCode() { + return Objects.hashCode(thread, ident); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ThreadIdentifier{"); + sb.append("thread=").append(thread); + sb.append(", ident=").append(ident); + sb.append('}'); + return sb.toString(); + } + } + protected TreeLockNode(String name) { this.name = name; this.readWriteLock = new ReentrantReadWriteLock(); @@ -61,10 +109,18 @@ public String getName() { return name; } - Map getHoldingThreadTimestamp() { + Map getHoldingThreadTimestamp() { return holdingThreadTimestamp; } + void addHoldingThreadTimestamp(Thread currentThread, NameIdentifier identifier, long timestamp) { + holdingThreadTimestamp.put(ThreadIdentifier.of(currentThread, identifier), timestamp); + } + + long removeHoldingThreadTimestamp(Thread currentThread, NameIdentifier identifier) { + return holdingThreadTimestamp.remove(ThreadIdentifier.of(currentThread, identifier)); + } + /** * Increase the reference count of this node. The reference count should always be greater than or * equal to 0. @@ -97,17 +153,6 @@ void lock(LockType lockType) { } else { readWriteLock.writeLock().lock(); } - - holdingThreadTimestamp.put(Thread.currentThread(), System.currentTimeMillis()); - if (LOG.isTraceEnabled()) { - LOG.trace( - "Node {} has been lock with '{}' lock, hold by {} at {}, current holding threads: {}", - this, - lockType, - Thread.currentThread(), - System.currentTimeMillis(), - holdingThreadTimestamp); - } } /** @@ -125,17 +170,6 @@ void unlock(LockType lockType) { } this.referenceCount.decrementAndGet(); - - long holdStartTime = holdingThreadTimestamp.remove(Thread.currentThread()); - if (LOG.isTraceEnabled()) { - LOG.trace( - "Node {} has been unlock with '{}' lock, hold by {} for {} ms, current holding threads: {}", - this, - lockType, - Thread.currentThread(), - System.currentTimeMillis() - holdStartTime, - holdingThreadTimestamp); - } } /** diff --git a/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java new file mode 100644 index 00000000000..43fb2244334 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datastrato.gravitino.lock; + +import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.GravitinoEnv; +import com.datastrato.gravitino.NameIdentifier; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.junit.jupiter.api.Test; + +public class TestTreeLockUtils { + + @Test + void testHolderMultipleLock() throws Exception { + Config config = mock(Config.class); + doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY); + doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY); + doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL); + FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true); + + TreeLockUtils.doWithTreeLock( + NameIdentifier.of("test"), + LockType.READ, + () -> + TreeLockUtils.doWithTreeLock( + NameIdentifier.of("test", "test1"), LockType.WRITE, () -> null)); + } +}