Skip to content

Commit

Permalink
[#203] Fix bugs of new ID allocation in store entities (#204)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
- Allow nested transactions in RocksdbKvBackend
- Introduce transaction in method `put` of `KvEntityStore` 

### Why are the changes needed?
Currently, new ID allocation logic depends on transactions, So we have
to add transactions in the method `put` that will possibly trigger new
ID allocation.

Fix: #203 

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add some test code in existing UT `testCreateKvEntityStore`
  • Loading branch information
yuqi1129 authored Aug 10, 2023
1 parent a11949e commit 12e7186
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ public boolean exists(NameIdentifier ident, EntityType entityType) throws IOExce
@Override
public <E extends Entity & HasIdentifier> void put(E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException {
// Simple implementation, just use the entity's identifier as the key
byte[] key = entityKeyEncoder.encode(EntityIdentifer.of(e.nameIdentifier(), e.type()), true);
byte[] value = serDe.serialize(e);
backend.put(key, value, overwritten);
executeInTransaction(
() -> {
byte[] key =
entityKeyEncoder.encode(EntityIdentifer.of(e.nameIdentifier(), e.type()), true);
byte[] value = serDe.serialize(e);
backend.put(key, value, overwritten);
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public void close() throws IOException {
@Override
public <R, E extends Exception> R executeInTransaction(Executable<R, E> executable)
throws E, IOException {
// Already in tx, directly execute it without creating a new tx
if (TX_LOCAL.get() != null) {
return executable.execute();
}

Transaction tx = db.beginTransaction(new WriteOptions());
LOGGER.info("Starting transaction: {}", tx);
TX_LOCAL.set(tx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ public void testCreateKvEntityStore() {
NoSuchEntityException.class,
() -> store.get(metalake.nameIdentifier(), EntityType.METALAKE, BaseMetalake.class));

// Add new updateMetaLake.
// 'updatedMetalake2' is a new name, which will trigger id allocation
BaseMetalake updatedMetalake2 =
new BaseMetalake.Builder()
.withId(metalake.getId())
.withName("updatedMetalake2")
.withAuditInfo(auditInfo)
.withVersion(SchemaVersion.V_0_1)
.build();
store.put(updatedMetalake2);
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
Expand Down

0 comments on commit 12e7186

Please sign in to comment.