Skip to content

Commit

Permalink
[#112] refactor(core): improve the EntityStore interface to add mor…
Browse files Browse the repository at this point in the history
…e methods (#115)

### What changes were proposed in this pull request?

This PR refactors the `EntityStore` to add more interfaces, so that
other implementations could leverage this to achieve the specific
logics.

### Why are the changes needed?

Current EntityStore only has two methods related to store and retrieve,
which is not enough to satisfy all the needs for entity storage. So here
improved to add more methods.

Fix: #112 

### Does this PR introduce _any_ user-facing change?

Yes, this change refactors the current `EntityStore` interface to add
more methods.

### How was this patch tested?

Extends the current UT.
  • Loading branch information
jerryshao authored Jul 24, 2023
1 parent 7e2055d commit 89682eb
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 40 deletions.
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ dependencies {
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.mockito.core)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton;

public class EntityAlreadyExistsException extends RuntimeException {
public EntityAlreadyExistsException(String message) {
super(message);
}

public EntityAlreadyExistsException(String message, Throwable cause) {
super(message, cause);
}
}

This file was deleted.

84 changes: 78 additions & 6 deletions core/src/main/java/com/datastrato/graviton/EntityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
*/
package com.datastrato.graviton;

import com.datastrato.graviton.util.Executable;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;

public interface EntityStore extends Closeable {

Expand All @@ -21,19 +23,70 @@ public interface EntityStore extends Closeable {
void initialize(Config config) throws RuntimeException;

/**
* Store the entity into the underlying storage.
* Set the {@link EntitySerDe} for the entity store. {@link EntitySerDe} will be used to serialize
* and deserialize the entities to the target format, and vice versa.
*
* <p>Note. The implementation should be thread-safe, and should be able to handle concurrent
* @param entitySerDe the entity serde to set
*/
void setSerDe(EntitySerDe entitySerDe);

/**
* List all the entities with the specified {@link Namespace}, and deserialize them into the
* specified {@link Entity} object.
*
* <p>Note. Depends on the isolation levels provided by the underlying storage, the returned list
* may not be consistent.
*
* @param namespace the namespace of the entities
* @return the list of entities
* @param <E> the type of the entity
* @throws IOException if the list operation fails
*/
<E extends Entity & HasIdentifier> List<E> list(Namespace namespace) throws IOException;

/**
* Check if the entity with the specified {@link NameIdentifier} exists.
*
* @param ident the name identifier of the entity
* @return true if the entity exists, false otherwise
* @throws IOException if the check operation fails
*/
boolean exists(NameIdentifier ident) throws IOException;

/**
* Store the entity into the underlying storage. If the entity already exists, it will overwrite
* the existing entity.
*
* @param ident the unique identifier of the entity
* @param e the entity to store
* @param <E> the type of the entity
* @throws IOException if the store operation fails
*/
default <E extends Entity & HasIdentifier> void put(NameIdentifier ident, E e)
throws IOException {
put(ident, e, false);
}

/**
* Store the entity into the underlying storage. According to the {@code overwritten} flag, it
* will overwrite the existing entity or throw an {@link EntityAlreadyExistsException}.
*
* <p>Note. The implementation should be transactional, and should be able to handle concurrent
* store of entities.
*
* @param entity the entity to store
* @param ident the unique identifier of the entity
* @param e the entity to store
* @param overwritten whether to overwrite the existing entity
* @param <E> the type of the entity
* @throws IOException if the store operation fails
* @throws EntityAlreadyExistsException if the entity already exists and the overwritten flag is
* set to false
*/
<E extends Entity & HasIdentifier> void storeEntity(E entity) throws IOException;
<E extends Entity & HasIdentifier> void put(NameIdentifier ident, E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException;

/**
* Retrieve the entity from the underlying storage.
* Get the entity from the underlying storage.
*
* <p>Note. The implementation should be thread-safe, and should be able to handle concurrent
* retrieve of entities.
Expand All @@ -44,6 +97,25 @@ public interface EntityStore extends Closeable {
* @throws NoSuchEntityException if the entity does not exist
* @throws IOException if the retrieve operation fails
*/
<E extends Entity & HasIdentifier> E retrieveEntity(HasIdentifier ident)
<E extends Entity & HasIdentifier> E get(NameIdentifier ident)
throws NoSuchEntityException, IOException;

/**
* Delete the entity from the underlying storage by the specified {@link NameIdentifier}.
*
* @param ident the name identifier of the entity
* @return true if the entity is deleted, false otherwise
* @throws IOException if the delete operation fails
*/
boolean delete(NameIdentifier ident) throws IOException;

/**
* Execute the specified {@link Executable} in a transaction.
*
* @param executable the executable to run
* @param <R> the type of the return value
* @return the return value of the executable
* @throws IOException if the execution fails
*/
<R> R executeInTransaction(Executable<R> executable) throws IOException;
}
35 changes: 35 additions & 0 deletions core/src/main/java/com/datastrato/graviton/EntityStoreFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntityStoreFactory {

private static final Logger LOG = LoggerFactory.getLogger(EntityStore.class);

// Register EntityStore's short name to its full qualified class name in the map. So that user
// don't need to specify the full qualified class name when creating an EntityStore.
private static final Map<String, String> ENTITY_STORES = ImmutableMap.of();

private EntityStoreFactory() {}

public static EntityStore createEntityStore(Config config) {
String name = config.get(configs.ENTITY_STORE);
String className = ENTITY_STORES.getOrDefault(name, name);

try {
EntityStore store = (EntityStore) Class.forName(className).newInstance();
store.initialize(config);
return store;
} catch (Exception e) {
LOG.error("Failed to create and initialize EntityStore by name {}.", name, e);
throw new RuntimeException("Failed to create and initialize EntityStore: " + name, e);
}
}
}
13 changes: 13 additions & 0 deletions core/src/main/java/com/datastrato/graviton/util/Executable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.util;

import java.io.IOException;

@FunctionalInterface
public interface Executable<R> {

R execute() throws IOException;
}
105 changes: 86 additions & 19 deletions core/src/test/java/com/datastrato/graviton/TestEntityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,104 @@
import com.datastrato.graviton.meta.*;
import com.datastrato.graviton.rel.Column;
import com.datastrato.graviton.rel.Table;
import com.datastrato.graviton.util.Executable;
import com.google.common.collect.Maps;
import io.substrait.type.TypeCreator;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestEntityStore {

public static class InMemoryEntityStore implements EntityStore {
private final Map<NameIdentifier, Entity> entityMap;

private EntitySerDe serde;

private final Lock lock;

public InMemoryEntityStore() {
this.entityMap = Maps.newHashMap();
this.entityMap = Maps.newConcurrentMap();
this.lock = new ReentrantLock();
}

@Override
public void initialize(Config config) throws RuntimeException {}

@Override
public synchronized <E extends Entity & HasIdentifier> void storeEntity(E entity)
throws IOException {
entityMap.put(entity.nameIdentifier(), entity);
public void setSerDe(EntitySerDe entitySerDe) {
this.serde = entitySerDe;
}

@Override
public synchronized <E extends Entity & HasIdentifier> E retrieveEntity(HasIdentifier ident)
throws NoSuchEntityException, IOException {
if (entityMap.containsKey(ident.nameIdentifier())) {
return (E) entityMap.get(ident.nameIdentifier());
public <E extends Entity & HasIdentifier> List<E> list(Namespace namespace) throws IOException {
return entityMap.entrySet().stream()
.filter(e -> e.getKey().namespace().equals(namespace))
.map(entry -> (E) entry.getValue())
.collect(Collectors.toList());
}

@Override
public boolean exists(NameIdentifier ident) throws IOException {
return entityMap.containsKey(ident);
}

@Override
public <E extends Entity & HasIdentifier> void put(
NameIdentifier ident, E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException {
if (overwritten) {
entityMap.put(ident, e);
} else {
throw new NoSuchEntityException("Entity " + ident.nameIdentifier() + " does not exist");
executeInTransaction(
() -> {
if (exists(ident)) {
throw new EntityAlreadyExistsException("Entity " + ident + " already exists");
}
entityMap.put(ident, e);
return null;
});
}
}

@Override
public void close() throws IOException {}
public <E extends Entity & HasIdentifier> E get(NameIdentifier ident)
throws NoSuchEntityException, IOException {
E e = (E) entityMap.get(ident);
if (e == null) {
throw new NoSuchEntityException("Entity " + ident + " does not exist");
}

return e;
}

@Override
public boolean delete(NameIdentifier ident) throws IOException {
Entity prev = entityMap.remove(ident);
return prev != null;
}

@Override
public <R> R executeInTransaction(Executable<R> executable) throws IOException {
try {
lock.lock();
return executable.execute();
} finally {
lock.unlock();
}
}

@Override
public void close() throws IOException {
entityMap.clear();
}
}

@Test
Expand Down Expand Up @@ -82,21 +142,28 @@ public void testEntityStoreAndRetrieve() throws Exception {
new Column[] {column});

InMemoryEntityStore store = new InMemoryEntityStore();
store.storeEntity(metalake);
store.storeEntity(catalog);
store.storeEntity(table);
store.storeEntity(column);
store.initialize(Mockito.mock(Config.class));
store.setSerDe(Mockito.mock(EntitySerDe.class));

store.put(metalake.nameIdentifier(), metalake);
store.put(catalog.nameIdentifier(), catalog);
store.put(table.nameIdentifier(), table);

Metalake retrievedMetalake = store.retrieveEntity(metalake);
Metalake retrievedMetalake = store.get(metalake.nameIdentifier());
Assertions.assertEquals(metalake, retrievedMetalake);

CatalogEntity retrievedCatalog = store.retrieveEntity(catalog);
CatalogEntity retrievedCatalog = store.get(catalog.nameIdentifier());
Assertions.assertEquals(catalog, retrievedCatalog);

Table retrievedTable = store.retrieveEntity(table);
Table retrievedTable = store.get(table.nameIdentifier());
Assertions.assertEquals(table, retrievedTable);

Column retrievedColumn = store.retrieveEntity(column);
Assertions.assertEquals(column, retrievedColumn);
store.delete(metalake.nameIdentifier());
Assertions.assertThrows(
NoSuchEntityException.class, () -> store.get(metalake.nameIdentifier()));

Assertions.assertThrows(
EntityAlreadyExistsException.class,
() -> store.put(catalog.nameIdentifier(), catalog, false));
}
}

0 comments on commit 89682eb

Please sign in to comment.