diff --git a/core/build.gradle.kts b/core/build.gradle.kts
index cfb4966fa6f..c947dde4353 100644
--- a/core/build.gradle.kts
+++ b/core/build.gradle.kts
@@ -31,4 +31,5 @@ dependencies {
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
+ testImplementation(libs.mockito.core)
}
diff --git a/core/src/main/java/com/datastrato/graviton/EntityAlreadyExistsException.java b/core/src/main/java/com/datastrato/graviton/EntityAlreadyExistsException.java
new file mode 100644
index 00000000000..e2a84fde812
--- /dev/null
+++ b/core/src/main/java/com/datastrato/graviton/EntityAlreadyExistsException.java
@@ -0,0 +1,15 @@
+/*
+ * Copyright 2023 Datastrato.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.graviton;
+
+public class EntityAlreadyExistsException extends RuntimeException {
+ public EntityAlreadyExistsException(String message) {
+ super(message);
+ }
+
+ public EntityAlreadyExistsException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/core/src/main/java/com/datastrato/graviton/EntityNotEmptyException.java b/core/src/main/java/com/datastrato/graviton/EntityNotEmptyException.java
deleted file mode 100644
index facf7d68ca1..00000000000
--- a/core/src/main/java/com/datastrato/graviton/EntityNotEmptyException.java
+++ /dev/null
@@ -1,15 +0,0 @@
-/*
- * Copyright 2023 Datastrato.
- * This software is licensed under the Apache License version 2.
- */
-package com.datastrato.graviton;
-
-public class EntityNotEmptyException extends RuntimeException {
- public EntityNotEmptyException(String message) {
- super(message);
- }
-
- public EntityNotEmptyException(String message, Throwable cause) {
- super(message, cause);
- }
-}
diff --git a/core/src/main/java/com/datastrato/graviton/EntityStore.java b/core/src/main/java/com/datastrato/graviton/EntityStore.java
index 89f619b5085..d283b6f8583 100644
--- a/core/src/main/java/com/datastrato/graviton/EntityStore.java
+++ b/core/src/main/java/com/datastrato/graviton/EntityStore.java
@@ -4,8 +4,10 @@
*/
package com.datastrato.graviton;
+import com.datastrato.graviton.util.Executable;
import java.io.Closeable;
import java.io.IOException;
+import java.util.List;
public interface EntityStore extends Closeable {
@@ -21,19 +23,70 @@ public interface EntityStore extends Closeable {
void initialize(Config config) throws RuntimeException;
/**
- * Store the entity into the underlying storage.
+ * Set the {@link EntitySerDe} for the entity store. {@link EntitySerDe} will be used to serialize
+ * and deserialize the entities to the target format, and vice versa.
*
- *
Note. The implementation should be thread-safe, and should be able to handle concurrent
+ * @param entitySerDe the entity serde to set
+ */
+ void setSerDe(EntitySerDe entitySerDe);
+
+ /**
+ * List all the entities with the specified {@link Namespace}, and deserialize them into the
+ * specified {@link Entity} object.
+ *
+ *
Note. Depends on the isolation levels provided by the underlying storage, the returned list
+ * may not be consistent.
+ *
+ * @param namespace the namespace of the entities
+ * @return the list of entities
+ * @param the type of the entity
+ * @throws IOException if the list operation fails
+ */
+ List list(Namespace namespace) throws IOException;
+
+ /**
+ * Check if the entity with the specified {@link NameIdentifier} exists.
+ *
+ * @param ident the name identifier of the entity
+ * @return true if the entity exists, false otherwise
+ * @throws IOException if the check operation fails
+ */
+ boolean exists(NameIdentifier ident) throws IOException;
+
+ /**
+ * Store the entity into the underlying storage. If the entity already exists, it will overwrite
+ * the existing entity.
+ *
+ * @param ident the unique identifier of the entity
+ * @param e the entity to store
+ * @param the type of the entity
+ * @throws IOException if the store operation fails
+ */
+ default void put(NameIdentifier ident, E e)
+ throws IOException {
+ put(ident, e, false);
+ }
+
+ /**
+ * Store the entity into the underlying storage. According to the {@code overwritten} flag, it
+ * will overwrite the existing entity or throw an {@link EntityAlreadyExistsException}.
+ *
+ * Note. The implementation should be transactional, and should be able to handle concurrent
* store of entities.
*
- * @param entity the entity to store
+ * @param ident the unique identifier of the entity
+ * @param e the entity to store
+ * @param overwritten whether to overwrite the existing entity
* @param the type of the entity
* @throws IOException if the store operation fails
+ * @throws EntityAlreadyExistsException if the entity already exists and the overwritten flag is
+ * set to false
*/
- void storeEntity(E entity) throws IOException;
+ void put(NameIdentifier ident, E e, boolean overwritten)
+ throws IOException, EntityAlreadyExistsException;
/**
- * Retrieve the entity from the underlying storage.
+ * Get the entity from the underlying storage.
*
* Note. The implementation should be thread-safe, and should be able to handle concurrent
* retrieve of entities.
@@ -44,6 +97,25 @@ public interface EntityStore extends Closeable {
* @throws NoSuchEntityException if the entity does not exist
* @throws IOException if the retrieve operation fails
*/
- E retrieveEntity(HasIdentifier ident)
+ E get(NameIdentifier ident)
throws NoSuchEntityException, IOException;
+
+ /**
+ * Delete the entity from the underlying storage by the specified {@link NameIdentifier}.
+ *
+ * @param ident the name identifier of the entity
+ * @return true if the entity is deleted, false otherwise
+ * @throws IOException if the delete operation fails
+ */
+ boolean delete(NameIdentifier ident) throws IOException;
+
+ /**
+ * Execute the specified {@link Executable} in a transaction.
+ *
+ * @param executable the executable to run
+ * @param the type of the return value
+ * @return the return value of the executable
+ * @throws IOException if the execution fails
+ */
+ R executeInTransaction(Executable executable) throws IOException;
}
diff --git a/core/src/main/java/com/datastrato/graviton/EntityStoreFactory.java b/core/src/main/java/com/datastrato/graviton/EntityStoreFactory.java
new file mode 100644
index 00000000000..eab3bf93aa6
--- /dev/null
+++ b/core/src/main/java/com/datastrato/graviton/EntityStoreFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023 Datastrato.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.graviton;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityStoreFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EntityStore.class);
+
+ // Register EntityStore's short name to its full qualified class name in the map. So that user
+ // don't need to specify the full qualified class name when creating an EntityStore.
+ private static final Map ENTITY_STORES = ImmutableMap.of();
+
+ private EntityStoreFactory() {}
+
+ public static EntityStore createEntityStore(Config config) {
+ String name = config.get(configs.ENTITY_STORE);
+ String className = ENTITY_STORES.getOrDefault(name, name);
+
+ try {
+ EntityStore store = (EntityStore) Class.forName(className).newInstance();
+ store.initialize(config);
+ return store;
+ } catch (Exception e) {
+ LOG.error("Failed to create and initialize EntityStore by name {}.", name, e);
+ throw new RuntimeException("Failed to create and initialize EntityStore: " + name, e);
+ }
+ }
+}
diff --git a/core/src/main/java/com/datastrato/graviton/util/Executable.java b/core/src/main/java/com/datastrato/graviton/util/Executable.java
new file mode 100644
index 00000000000..171755af3f7
--- /dev/null
+++ b/core/src/main/java/com/datastrato/graviton/util/Executable.java
@@ -0,0 +1,13 @@
+/*
+ * Copyright 2023 Datastrato.
+ * This software is licensed under the Apache License version 2.
+ */
+package com.datastrato.graviton.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface Executable {
+
+ R execute() throws IOException;
+}
diff --git a/core/src/test/java/com/datastrato/graviton/TestEntityStore.java b/core/src/test/java/com/datastrato/graviton/TestEntityStore.java
index 144d4b25b43..1b56ba8bc13 100644
--- a/core/src/test/java/com/datastrato/graviton/TestEntityStore.java
+++ b/core/src/test/java/com/datastrato/graviton/TestEntityStore.java
@@ -7,44 +7,104 @@
import com.datastrato.graviton.meta.*;
import com.datastrato.graviton.rel.Column;
import com.datastrato.graviton.rel.Table;
+import com.datastrato.graviton.util.Executable;
import com.google.common.collect.Maps;
import io.substrait.type.TypeCreator;
import java.io.IOException;
import java.time.Instant;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
public class TestEntityStore {
public static class InMemoryEntityStore implements EntityStore {
private final Map entityMap;
+ private EntitySerDe serde;
+
+ private final Lock lock;
+
public InMemoryEntityStore() {
- this.entityMap = Maps.newHashMap();
+ this.entityMap = Maps.newConcurrentMap();
+ this.lock = new ReentrantLock();
}
@Override
public void initialize(Config config) throws RuntimeException {}
@Override
- public synchronized void storeEntity(E entity)
- throws IOException {
- entityMap.put(entity.nameIdentifier(), entity);
+ public void setSerDe(EntitySerDe entitySerDe) {
+ this.serde = entitySerDe;
}
@Override
- public synchronized E retrieveEntity(HasIdentifier ident)
- throws NoSuchEntityException, IOException {
- if (entityMap.containsKey(ident.nameIdentifier())) {
- return (E) entityMap.get(ident.nameIdentifier());
+ public List list(Namespace namespace) throws IOException {
+ return entityMap.entrySet().stream()
+ .filter(e -> e.getKey().namespace().equals(namespace))
+ .map(entry -> (E) entry.getValue())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean exists(NameIdentifier ident) throws IOException {
+ return entityMap.containsKey(ident);
+ }
+
+ @Override
+ public void put(
+ NameIdentifier ident, E e, boolean overwritten)
+ throws IOException, EntityAlreadyExistsException {
+ if (overwritten) {
+ entityMap.put(ident, e);
} else {
- throw new NoSuchEntityException("Entity " + ident.nameIdentifier() + " does not exist");
+ executeInTransaction(
+ () -> {
+ if (exists(ident)) {
+ throw new EntityAlreadyExistsException("Entity " + ident + " already exists");
+ }
+ entityMap.put(ident, e);
+ return null;
+ });
}
}
@Override
- public void close() throws IOException {}
+ public E get(NameIdentifier ident)
+ throws NoSuchEntityException, IOException {
+ E e = (E) entityMap.get(ident);
+ if (e == null) {
+ throw new NoSuchEntityException("Entity " + ident + " does not exist");
+ }
+
+ return e;
+ }
+
+ @Override
+ public boolean delete(NameIdentifier ident) throws IOException {
+ Entity prev = entityMap.remove(ident);
+ return prev != null;
+ }
+
+ @Override
+ public R executeInTransaction(Executable executable) throws IOException {
+ try {
+ lock.lock();
+ return executable.execute();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ entityMap.clear();
+ }
}
@Test
@@ -82,21 +142,28 @@ public void testEntityStoreAndRetrieve() throws Exception {
new Column[] {column});
InMemoryEntityStore store = new InMemoryEntityStore();
- store.storeEntity(metalake);
- store.storeEntity(catalog);
- store.storeEntity(table);
- store.storeEntity(column);
+ store.initialize(Mockito.mock(Config.class));
+ store.setSerDe(Mockito.mock(EntitySerDe.class));
+
+ store.put(metalake.nameIdentifier(), metalake);
+ store.put(catalog.nameIdentifier(), catalog);
+ store.put(table.nameIdentifier(), table);
- Metalake retrievedMetalake = store.retrieveEntity(metalake);
+ Metalake retrievedMetalake = store.get(metalake.nameIdentifier());
Assertions.assertEquals(metalake, retrievedMetalake);
- CatalogEntity retrievedCatalog = store.retrieveEntity(catalog);
+ CatalogEntity retrievedCatalog = store.get(catalog.nameIdentifier());
Assertions.assertEquals(catalog, retrievedCatalog);
- Table retrievedTable = store.retrieveEntity(table);
+ Table retrievedTable = store.get(table.nameIdentifier());
Assertions.assertEquals(table, retrievedTable);
- Column retrievedColumn = store.retrieveEntity(column);
- Assertions.assertEquals(column, retrievedColumn);
+ store.delete(metalake.nameIdentifier());
+ Assertions.assertThrows(
+ NoSuchEntityException.class, () -> store.get(metalake.nameIdentifier()));
+
+ Assertions.assertThrows(
+ EntityAlreadyExistsException.class,
+ () -> store.put(catalog.nameIdentifier(), catalog, false));
}
}