Skip to content

Commit

Permalink
[#4129] improvement(core): Support hold multiple tree lock within a t…
Browse files Browse the repository at this point in the history
…hread at the same time (#4130)

### What changes were proposed in this pull request?

Add the value of the name identifier in the holdingThreadTimestamp to
support holding multiple tree lock at the same time.

### Why are the changes needed?

To support more user sceanrio

Fix: #4129 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

Add new test class `TestTreeLockUtils`
  • Loading branch information
yuqi1129 authored Jul 12, 2024
1 parent 87b58fe commit 1b1ef58
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ void checkDeadLock(TreeLockNode node) {
// Check self
node.getHoldingThreadTimestamp()
.forEach(
(thread, ts) -> {
(threadIdentifier, ts) -> {
// If the thread is holding the lock for more than 30 seconds, we will log it.
if (System.currentTimeMillis() - ts > 30000) {
LOG.warn(
"Dead lock detected for thread {} on node {}, threads that holding the node: {} ",
thread,
"Dead lock detected for thread with identifier {} on node {}, threads that holding the node: {} ",
threadIdentifier,
node,
node.getHoldingThreadTimestamp());
}
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,17 @@ public void lock(LockType lockType) {
try {
treeLockNode.lock(type);
heldLocks.push(Pair.of(treeLockNode, type));

treeLockNode.addHoldingThreadTimestamp(
Thread.currentThread(), identifier, System.currentTimeMillis());
if (LOG.isTraceEnabled()) {
LOG.trace("Locked node: {}, lock type: {}", treeLockNode, type);
LOG.trace(
"Node {} has been lock with '{}' lock, hold by {} with ident '{}' at {}",
this,
lockType,
Thread.currentThread(),
identifier,
System.currentTimeMillis());
}
} catch (Exception e) {
LOG.error(
Expand Down Expand Up @@ -140,8 +149,16 @@ public void unlock() {
TreeLockNode current = pair.getLeft();
LockType type = pair.getRight();
current.unlock(type);

long holdStartTime = current.removeHoldingThreadTimestamp(Thread.currentThread(), identifier);
if (LOG.isTraceEnabled()) {
LOG.trace("Unlocked node: {}, lock type: {}", current, type);
LOG.trace(
"Node {} has been unlock with '{}' lock, hold by {} with ident '{}' for {} ms",
this,
lockType,
Thread.currentThread(),
identifier,
System.currentTimeMillis() - holdStartTime);
}
}

Expand Down
82 changes: 58 additions & 24 deletions core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package com.datastrato.gravitino.lock;

import com.datastrato.gravitino.NameIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
Expand All @@ -44,13 +45,60 @@ public class TreeLockNode {
private final String name;
private final ReentrantReadWriteLock readWriteLock;
@VisibleForTesting final Map<String, TreeLockNode> childMap;
private final Map<Thread, Long> holdingThreadTimestamp = new ConcurrentHashMap<>();

private final Map<ThreadIdentifier, Long> holdingThreadTimestamp = new ConcurrentHashMap<>();

// The reference count of this node. The reference count is used to track the number of the
// TreeLocks that are using this node. If the reference count is 0, it means that no TreeLock is
// using this node, and this node can be removed from the tree.
private final AtomicLong referenceCount = new AtomicLong();

/**
* The identifier of a thread. This class is used to identify this tree lock node is held by which
* thread and identifier because one thread can hold multiple tree lock nodes at the same time.
* For example, a thread can hold the lock of the root node and the lock of the child node at the
* same time.
*/
static class ThreadIdentifier {
private final Thread thread;
private final NameIdentifier ident;

public ThreadIdentifier(Thread thread, NameIdentifier ident) {
this.thread = thread;
this.ident = ident;
}

static ThreadIdentifier of(Thread thread, NameIdentifier identifier) {
return new ThreadIdentifier(thread, identifier);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof ThreadIdentifier)) {
return false;
}
ThreadIdentifier that = (ThreadIdentifier) o;
return Objects.equal(thread, that.thread) && Objects.equal(ident, that.ident);
}

@Override
public int hashCode() {
return Objects.hashCode(thread, ident);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ThreadIdentifier{");
sb.append("thread=").append(thread);
sb.append(", ident=").append(ident);
sb.append('}');
return sb.toString();
}
}

protected TreeLockNode(String name) {
this.name = name;
this.readWriteLock = new ReentrantReadWriteLock();
Expand All @@ -61,10 +109,18 @@ public String getName() {
return name;
}

Map<Thread, Long> getHoldingThreadTimestamp() {
Map<ThreadIdentifier, Long> getHoldingThreadTimestamp() {
return holdingThreadTimestamp;
}

void addHoldingThreadTimestamp(Thread currentThread, NameIdentifier identifier, long timestamp) {
holdingThreadTimestamp.put(ThreadIdentifier.of(currentThread, identifier), timestamp);
}

long removeHoldingThreadTimestamp(Thread currentThread, NameIdentifier identifier) {
return holdingThreadTimestamp.remove(ThreadIdentifier.of(currentThread, identifier));
}

/**
* Increase the reference count of this node. The reference count should always be greater than or
* equal to 0.
Expand Down Expand Up @@ -97,17 +153,6 @@ void lock(LockType lockType) {
} else {
readWriteLock.writeLock().lock();
}

holdingThreadTimestamp.put(Thread.currentThread(), System.currentTimeMillis());
if (LOG.isTraceEnabled()) {
LOG.trace(
"Node {} has been lock with '{}' lock, hold by {} at {}, current holding threads: {}",
this,
lockType,
Thread.currentThread(),
System.currentTimeMillis(),
holdingThreadTimestamp);
}
}

/**
Expand All @@ -125,17 +170,6 @@ void unlock(LockType lockType) {
}

this.referenceCount.decrementAndGet();

long holdStartTime = holdingThreadTimestamp.remove(Thread.currentThread());
if (LOG.isTraceEnabled()) {
LOG.trace(
"Node {} has been unlock with '{}' lock, hold by {} for {} ms, current holding threads: {}",
this,
lockType,
Thread.currentThread(),
System.currentTimeMillis() - holdStartTime,
holdingThreadTimestamp);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.datastrato.gravitino.lock;

import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.jupiter.api.Test;

public class TestTreeLockUtils {

@Test
void testHolderMultipleLock() throws Exception {
Config config = mock(Config.class);
doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true);

TreeLockUtils.doWithTreeLock(
NameIdentifier.of("test"),
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
NameIdentifier.of("test", "test1"), LockType.WRITE, () -> null));
}
}

0 comments on commit 1b1ef58

Please sign in to comment.