Skip to content

Commit

Permalink
[apache#780] improvment(core): Remove RW lock in KvEntityStore as apa…
Browse files Browse the repository at this point in the history
…che#779 has guaranteed thread safe. (apache#2311)

### What changes were proposed in this pull request?

Remove the RW lock that guard concurrent issues in `KvEntityStore`.

### Why are the changes needed?

apache#779 has introduced tree lock to guarantee thread-safe, so we can remove
it.

Fix: apache#780 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

Existing tests can cover it.
  • Loading branch information
yuqi1129 authored Feb 22, 2024
1 parent 4333190 commit 15e8a4e
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 198 deletions.
239 changes: 110 additions & 129 deletions core/src/main/java/com/datastrato/gravitino/storage/kv/KvEntityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,16 @@ public <E extends Entity & HasIdentifier> List<E> list(

byte[] endKey = Bytes.increment(Bytes.wrap(startKey)).get();
List<Pair<byte[], byte[]>> kvs =
FunctionUtils.executeWithReadLock(
executeInTransaction(
() ->
executeInTransaction(
() ->
transactionalKvBackend.scan(
new KvRange.KvRangeBuilder()
.start(startKey)
.end(endKey)
.startInclusive(true)
.endInclusive(false)
.limit(Integer.MAX_VALUE)
.build())),
reentrantReadWriteLock);
transactionalKvBackend.scan(
new KvRange.KvRangeBuilder()
.start(startKey)
.end(endKey)
.startInclusive(true)
.endInclusive(false)
.limit(Integer.MAX_VALUE)
.build()));
for (Pair<byte[], byte[]> pairs : kvs) {
entities.add(serDe.deserialize(pairs.getRight(), e));
}
Expand All @@ -147,73 +144,63 @@ public <E extends Entity & HasIdentifier> List<E> list(

@Override
public boolean exists(NameIdentifier ident, EntityType entityType) throws IOException {
return FunctionUtils.executeWithReadLock(
() ->
executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(ident, entityType, true);
if (key == null) {
return false;
}
return transactionalKvBackend.get(key) != null;
}),
reentrantReadWriteLock);
return executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(ident, entityType, true);
if (key == null) {
return false;
}
return transactionalKvBackend.get(key) != null;
});
}

@Override
public <E extends Entity & HasIdentifier> void put(E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException {
FunctionUtils.executeWithWriteLock(
() ->
executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(e.nameIdentifier(), e.type());
byte[] value = serDe.serialize(e);
transactionalKvBackend.put(key, value, overwritten);
return null;
}),
reentrantReadWriteLock);
executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(e.nameIdentifier(), e.type());
byte[] value = serDe.serialize(e);
transactionalKvBackend.put(key, value, overwritten);
return null;
});
}

@Override
public <E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Class<E> type, EntityType entityType, Function<E, E> updater)
throws IOException, NoSuchEntityException, AlreadyExistsException {
return FunctionUtils.executeWithWriteLock(
() ->
executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(ident, entityType);
byte[] value = transactionalKvBackend.get(key);
if (value == null) {
throw new NoSuchEntityException("No such entity:%s", ident.toString());
}

E e = serDe.deserialize(value, type);
E updatedE = updater.apply(e);
if (updatedE.nameIdentifier().equals(ident)) {
transactionalKvBackend.put(key, serDe.serialize(updatedE), true);
return updatedE;
}

// If we have changed the name of the entity, We would do the following steps:
// Check whether the new entities already existed
boolean newEntityExist = exists(updatedE.nameIdentifier(), entityType);
if (newEntityExist) {
throw new AlreadyExistsException(
"Entity %s already exist, please check again", updatedE.nameIdentifier());
}

// Update the name mapping
nameMappingService.updateName(
generateKeyForMapping(ident),
generateKeyForMapping(updatedE.nameIdentifier()));

// Update the entity to store
transactionalKvBackend.put(key, serDe.serialize(updatedE), true);
return updatedE;
}),
reentrantReadWriteLock);
return executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(ident, entityType);
byte[] value = transactionalKvBackend.get(key);
if (value == null) {
throw new NoSuchEntityException("No such entity:%s", ident.toString());
}

E e = serDe.deserialize(value, type);
E updatedE = updater.apply(e);
if (updatedE.nameIdentifier().equals(ident)) {
transactionalKvBackend.put(key, serDe.serialize(updatedE), true);
return updatedE;
}

// If we have changed the name of the entity, We would do the following steps:
// Check whether the new entities already existed
boolean newEntityExist = exists(updatedE.nameIdentifier(), entityType);
if (newEntityExist) {
throw new AlreadyExistsException(
"Entity %s already exist, please check again", updatedE.nameIdentifier());
}

// Update the name mapping
nameMappingService.updateName(
generateKeyForMapping(ident), generateKeyForMapping(updatedE.nameIdentifier()));

// Update the entity to store
transactionalKvBackend.put(key, serDe.serialize(updatedE), true);
return updatedE;
});
}

private String concatIdAndName(long[] namespaceIds, String name) {
Expand Down Expand Up @@ -279,17 +266,14 @@ public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, EntityType entityType, Class<E> e)
throws NoSuchEntityException, IOException {
byte[] value =
FunctionUtils.executeWithReadLock(
() ->
executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(ident, entityType, true);
if (key == null) {
throw new NoSuchEntityException("No such entity:%s", ident.toString());
}
return transactionalKvBackend.get(key);
}),
reentrantReadWriteLock);
executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(ident, entityType, true);
if (key == null) {
throw new NoSuchEntityException("No such entity:%s", ident.toString());
}
return transactionalKvBackend.get(key);
});
if (value == null) {
throw new NoSuchEntityException("No such entity:%s", ident.toString());
}
Expand Down Expand Up @@ -353,56 +337,53 @@ private byte[] replacePrefixTypeInfo(byte[] encode, String subTypePrefix) {
@Override
public boolean delete(NameIdentifier ident, EntityType entityType, boolean cascade)
throws IOException {
return FunctionUtils.executeWithWriteLock(
() ->
executeInTransaction(
() -> {
if (!exists(ident, entityType)) {
return false;
}

byte[] dataKey = entityKeyEncoder.encode(ident, entityType, true);
List<byte[]> subEntityPrefix = getSubEntitiesPrefix(ident, entityType);
if (subEntityPrefix.isEmpty()) {
// has no sub-entities
return transactionalKvBackend.delete(dataKey);
}

byte[] directChild = Iterables.getLast(subEntityPrefix);
byte[] endKey = Bytes.increment(Bytes.wrap(directChild)).get();
List<Pair<byte[], byte[]>> kvs =
transactionalKvBackend.scan(
new KvRange.KvRangeBuilder()
.start(directChild)
.end(endKey)
.startInclusive(true)
.endInclusive(false)
.limit(1)
.build());

if (!cascade && !kvs.isEmpty()) {
List<NameIdentifier> subEntities = Lists.newArrayListWithCapacity(kvs.size());
for (Pair<byte[], byte[]> pair : kvs) {
subEntities.add(entityKeyEncoder.decode(pair.getLeft()).getLeft());
}

throw new NonEmptyEntityException(
"Entity %s has sub-entities %s, you should remove sub-entities first",
ident, subEntities);
}

for (byte[] prefix : subEntityPrefix) {
transactionalKvBackend.deleteRange(
new KvRange.KvRangeBuilder()
.start(prefix)
.startInclusive(true)
.end(Bytes.increment(Bytes.wrap(prefix)).get())
.build());
}

return transactionalKvBackend.delete(dataKey);
}),
reentrantReadWriteLock);
return executeInTransaction(
() -> {
if (!exists(ident, entityType)) {
return false;
}

byte[] dataKey = entityKeyEncoder.encode(ident, entityType, true);
List<byte[]> subEntityPrefix = getSubEntitiesPrefix(ident, entityType);
if (subEntityPrefix.isEmpty()) {
// has no sub-entities
return transactionalKvBackend.delete(dataKey);
}

byte[] directChild = Iterables.getLast(subEntityPrefix);
byte[] endKey = Bytes.increment(Bytes.wrap(directChild)).get();
List<Pair<byte[], byte[]>> kvs =
transactionalKvBackend.scan(
new KvRange.KvRangeBuilder()
.start(directChild)
.end(endKey)
.startInclusive(true)
.endInclusive(false)
.limit(1)
.build());

if (!cascade && !kvs.isEmpty()) {
List<NameIdentifier> subEntities = Lists.newArrayListWithCapacity(kvs.size());
for (Pair<byte[], byte[]> pair : kvs) {
subEntities.add(entityKeyEncoder.decode(pair.getLeft()).getLeft());
}

throw new NonEmptyEntityException(
"Entity %s has sub-entities %s, you should remove sub-entities first",
ident, subEntities);
}

for (byte[] prefix : subEntityPrefix) {
transactionalKvBackend.deleteRange(
new KvRange.KvRangeBuilder()
.start(prefix)
.startInclusive(true)
.end(Bytes.increment(Bytes.wrap(prefix)).get())
.build());
}

return transactionalKvBackend.delete(dataKey);
});
}

@Override
Expand Down
Loading

0 comments on commit 15e8a4e

Please sign in to comment.