From d3ce3702da48aa28ed53242d290755b5629d9762 Mon Sep 17 00:00:00 2001 From: xiaojiebao Date: Fri, 2 Feb 2024 11:43:49 +0800 Subject: [PATCH] add rel interface --- .../com/datastrato/gravitino/Configs.java | 11 ++ .../gravitino/EntityStoreFactory.java | 7 +- .../datastrato/gravitino/GravitinoEnv.java | 5 +- .../storage/relation/RelationBackend.java | 41 +++++++ .../storage/relation/RelationEntityStore.java | 111 ++++++++++++++++++ .../storage/relation/mysql/MysqlBackend.java | 67 +++++++++++ 6 files changed, 240 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relation/RelationBackend.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relation/RelationEntityStore.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MysqlBackend.java diff --git a/core/src/main/java/com/datastrato/gravitino/Configs.java b/core/src/main/java/com/datastrato/gravitino/Configs.java index 3b3d6246f69..60f1fb5571c 100644 --- a/core/src/main/java/com/datastrato/gravitino/Configs.java +++ b/core/src/main/java/com/datastrato/gravitino/Configs.java @@ -11,11 +11,15 @@ public interface Configs { String DEFAULT_ENTITY_STORE = "kv"; + String RELATION_ENTITY_STORE = "rel"; String ENTITY_STORE_KEY = "gravitino.entity.store"; String DEFAULT_ENTITY_KV_STORE = "RocksDBKvBackend"; String ENTITY_KV_STORE_KEY = "gravitino.entity.store.kv"; + String DEFAULT_ENTITY_RELATION_STORE = "MysqlBackend"; + String ENTITY_RELATION_STORE_KEY = "gravitino.entity.store.rel"; + String ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY = "gravitino.entity.store.kv.rocksdbPath"; Long DEFAULT_KV_DELETE_AFTER_TIME = 604800000L; // 7 days @@ -45,6 +49,13 @@ public interface Configs { .stringConf() .createWithDefault(DEFAULT_ENTITY_KV_STORE); + ConfigEntry ENTITY_RELATION_STORE = + new ConfigBuilder(ENTITY_RELATION_STORE_KEY) + .doc("Detailed implementation of relation storage") + .version("0.1.0") + .stringConf() + .createWithDefault(DEFAULT_ENTITY_RELATION_STORE); + ConfigEntry ENTRY_KV_ROCKSDB_BACKEND_PATH = new ConfigBuilder(ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY) .doc("Directory path of `RocksDBKvBackend`") diff --git a/core/src/main/java/com/datastrato/gravitino/EntityStoreFactory.java b/core/src/main/java/com/datastrato/gravitino/EntityStoreFactory.java index 8f1a0367821..b1beed608b6 100644 --- a/core/src/main/java/com/datastrato/gravitino/EntityStoreFactory.java +++ b/core/src/main/java/com/datastrato/gravitino/EntityStoreFactory.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino; import com.datastrato.gravitino.storage.kv.KvEntityStore; +import com.datastrato.gravitino.storage.relation.RelationEntityStore; import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +21,11 @@ public class EntityStoreFactory { // Register EntityStore's short name to its full qualified class name in the map. So that user // doesn't need to specify the full qualified class name when creating an EntityStore. public static final ImmutableMap ENTITY_STORES = - ImmutableMap.of("kv", KvEntityStore.class.getCanonicalName()); + ImmutableMap.of( + "kv", + KvEntityStore.class.getCanonicalName(), + "rel", + RelationEntityStore.class.getCanonicalName()); // Private constructor to prevent instantiation of this factory class. private EntityStoreFactory() {} diff --git a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java index 820af076e1a..6486641ceb0 100644 --- a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java +++ b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.metrics.source.JVMMetricsSource; import com.datastrato.gravitino.storage.IdGenerator; import com.datastrato.gravitino.storage.RandomIdGenerator; +import com.datastrato.gravitino.storage.relation.RelationEntityStore; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +75,9 @@ public void initialize(Config config) { // Initialize EntityStore this.entityStore = EntityStoreFactory.createEntityStore(config); entityStore.initialize(config); - entityStore.setSerDe(entitySerDe); + if (!(entityStore instanceof RelationEntityStore)) { + entityStore.setSerDe(entitySerDe); + } // create and initialize a random id generator this.idGenerator = new RandomIdGenerator(); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/RelationBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/RelationBackend.java new file mode 100644 index 00000000000..43c1af04b1d --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/RelationBackend.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relation; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.function.Function; + +public interface RelationBackend extends Closeable { + + void initialize(Config config) throws IOException; + + List list(Namespace namespace, Entity.EntityType entityType) + throws IOException; + + boolean exists(NameIdentifier ident, Entity.EntityType entityType) throws IOException; + + void put(E e, boolean overwritten) + throws IOException, EntityAlreadyExistsException; + + E update( + NameIdentifier ident, Entity.EntityType entityType, Function updater) + throws IOException, NoSuchEntityException, AlreadyExistsException; + + E get(NameIdentifier ident, Entity.EntityType entityType) + throws NoSuchEntityException, IOException; + + boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) + throws IOException; +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/RelationEntityStore.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/RelationEntityStore.java new file mode 100644 index 00000000000..a8a62b449f5 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/RelationEntityStore.java @@ -0,0 +1,111 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relation; + +import static com.datastrato.gravitino.Configs.ENTITY_RELATION_STORE; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; +import com.datastrato.gravitino.EntitySerDe; +import com.datastrato.gravitino.EntityStore; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.storage.relation.mysql.MysqlBackend; +import com.datastrato.gravitino.utils.Executable; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RelationEntityStore implements EntityStore { + private static final Logger LOGGER = LoggerFactory.getLogger(RelationEntityStore.class); + public static final ImmutableMap RELATION_BACKENDS = + ImmutableMap.of("MysqlBackend", MysqlBackend.class.getCanonicalName()); + private RelationBackend backend; + + @Override + public void initialize(Config config) throws RuntimeException { + this.backend = createRelationEntityBackend(config); + } + + private static RelationBackend createRelationEntityBackend(Config config) { + String backendName = config.get(ENTITY_RELATION_STORE); + String className = RELATION_BACKENDS.getOrDefault(backendName, backendName); + if (Objects.isNull(className)) { + throw new RuntimeException("Unsupported backend type..." + backendName); + } + + try { + RelationBackend relationBackend = + (RelationBackend) Class.forName(className).getDeclaredConstructor().newInstance(); + relationBackend.initialize(config); + return relationBackend; + } catch (Exception e) { + LOGGER.error("Failed to create and initialize RelationBackend by name '{}'.", backendName, e); + throw new RuntimeException( + "Failed to create and initialize RelationBackend by name: " + backendName, e); + } + } + + @Override + public void setSerDe(EntitySerDe entitySerDe) { + throw new UnsupportedOperationException("Unsupported operation in relation entity store."); + } + + @Override + public List list( + Namespace namespace, Class type, Entity.EntityType entityType) throws IOException { + return backend.list(namespace, entityType); + } + + @Override + public boolean exists(NameIdentifier ident, Entity.EntityType entityType) throws IOException { + return backend.exists(ident, entityType); + } + + @Override + public void put(E e, boolean overwritten) + throws IOException, EntityAlreadyExistsException { + backend.put(e, overwritten); + } + + @Override + public E update( + NameIdentifier ident, Class type, Entity.EntityType entityType, Function updater) + throws IOException, NoSuchEntityException, AlreadyExistsException { + return backend.update(ident, entityType, updater); + } + + @Override + public E get( + NameIdentifier ident, Entity.EntityType entityType, Class e) + throws NoSuchEntityException, IOException { + return backend.get(ident, entityType); + } + + @Override + public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) + throws IOException { + return backend.delete(ident, entityType, cascade); + } + + @Override + public R executeInTransaction(Executable executable) + throws E, IOException { + throw new UnsupportedOperationException("Unsupported operation in relation entity store."); + } + + @Override + public void close() throws IOException { + backend.close(); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MysqlBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MysqlBackend.java new file mode 100644 index 00000000000..508f81a83ce --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MysqlBackend.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relation.mysql; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.AlreadyExistsException; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.storage.relation.RelationBackend; +import java.io.IOException; +import java.util.List; +import java.util.function.Function; + +public class MysqlBackend implements RelationBackend { + @Override + public void initialize(Config config) throws IOException { + throw new UnsupportedOperationException("Unsupported operation now."); + } + + @Override + public List list( + Namespace namespace, Entity.EntityType entityType) throws IOException { + throw new UnsupportedOperationException("Unsupported operation now."); + } + + @Override + public boolean exists(NameIdentifier ident, Entity.EntityType entityType) throws IOException { + throw new UnsupportedOperationException("Unsupported operation now."); + } + + @Override + public void put(E e, boolean overwritten) + throws IOException, EntityAlreadyExistsException { + throw new UnsupportedOperationException("Unsupported operation now."); + } + + @Override + public E update( + NameIdentifier ident, Entity.EntityType entityType, Function updater) + throws IOException, NoSuchEntityException, AlreadyExistsException { + throw new UnsupportedOperationException("Unsupported operation now."); + } + + @Override + public E get( + NameIdentifier ident, Entity.EntityType entityType) + throws NoSuchEntityException, IOException { + throw new UnsupportedOperationException("Unsupported operation now."); + } + + @Override + public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) + throws IOException { + throw new UnsupportedOperationException("Unsupported operation now."); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException("Unsupported operation now."); + } +}