Skip to content

Commit

Permalink
add rel interface
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Feb 2, 2024
1 parent 0cc0400 commit d3ce370
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 2 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
public interface Configs {

String DEFAULT_ENTITY_STORE = "kv";
String RELATION_ENTITY_STORE = "rel";
String ENTITY_STORE_KEY = "gravitino.entity.store";

String DEFAULT_ENTITY_KV_STORE = "RocksDBKvBackend";
String ENTITY_KV_STORE_KEY = "gravitino.entity.store.kv";

String DEFAULT_ENTITY_RELATION_STORE = "MysqlBackend";
String ENTITY_RELATION_STORE_KEY = "gravitino.entity.store.rel";

String ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY = "gravitino.entity.store.kv.rocksdbPath";

Long DEFAULT_KV_DELETE_AFTER_TIME = 604800000L; // 7 days
Expand Down Expand Up @@ -45,6 +49,13 @@ public interface Configs {
.stringConf()
.createWithDefault(DEFAULT_ENTITY_KV_STORE);

ConfigEntry<String> ENTITY_RELATION_STORE =
new ConfigBuilder(ENTITY_RELATION_STORE_KEY)
.doc("Detailed implementation of relation storage")
.version("0.1.0")
.stringConf()
.createWithDefault(DEFAULT_ENTITY_RELATION_STORE);

ConfigEntry<String> ENTRY_KV_ROCKSDB_BACKEND_PATH =
new ConfigBuilder(ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY)
.doc("Directory path of `RocksDBKvBackend`")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino;

import com.datastrato.gravitino.storage.kv.KvEntityStore;
import com.datastrato.gravitino.storage.relation.RelationEntityStore;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,7 +21,11 @@ public class EntityStoreFactory {
// Register EntityStore's short name to its full qualified class name in the map. So that user
// doesn't need to specify the full qualified class name when creating an EntityStore.
public static final ImmutableMap<String, String> ENTITY_STORES =
ImmutableMap.of("kv", KvEntityStore.class.getCanonicalName());
ImmutableMap.of(
"kv",
KvEntityStore.class.getCanonicalName(),
"rel",
RelationEntityStore.class.getCanonicalName());

// Private constructor to prevent instantiation of this factory class.
private EntityStoreFactory() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.metrics.source.JVMMetricsSource;
import com.datastrato.gravitino.storage.IdGenerator;
import com.datastrato.gravitino.storage.RandomIdGenerator;
import com.datastrato.gravitino.storage.relation.RelationEntityStore;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,7 +75,9 @@ public void initialize(Config config) {
// Initialize EntityStore
this.entityStore = EntityStoreFactory.createEntityStore(config);
entityStore.initialize(config);
entityStore.setSerDe(entitySerDe);
if (!(entityStore instanceof RelationEntityStore)) {
entityStore.setSerDe(entitySerDe);
}

// create and initialize a random id generator
this.idGenerator = new RandomIdGenerator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relation;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityAlreadyExistsException;
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;

public interface RelationBackend extends Closeable {

void initialize(Config config) throws IOException;

<E extends Entity & HasIdentifier> List<E> list(Namespace namespace, Entity.EntityType entityType)
throws IOException;

boolean exists(NameIdentifier ident, Entity.EntityType entityType) throws IOException;

<E extends Entity & HasIdentifier> void put(E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException;

<E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Entity.EntityType entityType, Function<E, E> updater)
throws IOException, NoSuchEntityException, AlreadyExistsException;

<E extends Entity & HasIdentifier> E get(NameIdentifier ident, Entity.EntityType entityType)
throws NoSuchEntityException, IOException;

boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relation;

import static com.datastrato.gravitino.Configs.ENTITY_RELATION_STORE;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityAlreadyExistsException;
import com.datastrato.gravitino.EntitySerDe;
import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.storage.relation.mysql.MysqlBackend;
import com.datastrato.gravitino.utils.Executable;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RelationEntityStore implements EntityStore {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationEntityStore.class);
public static final ImmutableMap<String, String> RELATION_BACKENDS =
ImmutableMap.of("MysqlBackend", MysqlBackend.class.getCanonicalName());
private RelationBackend backend;

@Override
public void initialize(Config config) throws RuntimeException {
this.backend = createRelationEntityBackend(config);
}

private static RelationBackend createRelationEntityBackend(Config config) {
String backendName = config.get(ENTITY_RELATION_STORE);
String className = RELATION_BACKENDS.getOrDefault(backendName, backendName);
if (Objects.isNull(className)) {
throw new RuntimeException("Unsupported backend type..." + backendName);
}

try {
RelationBackend relationBackend =
(RelationBackend) Class.forName(className).getDeclaredConstructor().newInstance();
relationBackend.initialize(config);
return relationBackend;
} catch (Exception e) {
LOGGER.error("Failed to create and initialize RelationBackend by name '{}'.", backendName, e);
throw new RuntimeException(
"Failed to create and initialize RelationBackend by name: " + backendName, e);
}
}

@Override
public void setSerDe(EntitySerDe entitySerDe) {
throw new UnsupportedOperationException("Unsupported operation in relation entity store.");
}

@Override
public <E extends Entity & HasIdentifier> List<E> list(
Namespace namespace, Class<E> type, Entity.EntityType entityType) throws IOException {
return backend.list(namespace, entityType);
}

@Override
public boolean exists(NameIdentifier ident, Entity.EntityType entityType) throws IOException {
return backend.exists(ident, entityType);
}

@Override
public <E extends Entity & HasIdentifier> void put(E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException {
backend.put(e, overwritten);
}

@Override
public <E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Class<E> type, Entity.EntityType entityType, Function<E, E> updater)
throws IOException, NoSuchEntityException, AlreadyExistsException {
return backend.update(ident, entityType, updater);
}

@Override
public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType, Class<E> e)
throws NoSuchEntityException, IOException {
return backend.get(ident, entityType);
}

@Override
public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade)
throws IOException {
return backend.delete(ident, entityType, cascade);
}

@Override
public <R, E extends Exception> R executeInTransaction(Executable<R, E> executable)
throws E, IOException {
throw new UnsupportedOperationException("Unsupported operation in relation entity store.");
}

@Override
public void close() throws IOException {
backend.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relation.mysql;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityAlreadyExistsException;
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.storage.relation.RelationBackend;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;

public class MysqlBackend implements RelationBackend {
@Override
public void initialize(Config config) throws IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
}

@Override
public <E extends Entity & HasIdentifier> List<E> list(
Namespace namespace, Entity.EntityType entityType) throws IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
}

@Override
public boolean exists(NameIdentifier ident, Entity.EntityType entityType) throws IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
}

@Override
public <E extends Entity & HasIdentifier> void put(E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException {
throw new UnsupportedOperationException("Unsupported operation now.");
}

@Override
public <E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Entity.EntityType entityType, Function<E, E> updater)
throws IOException, NoSuchEntityException, AlreadyExistsException {
throw new UnsupportedOperationException("Unsupported operation now.");
}

@Override
public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType)
throws NoSuchEntityException, IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
}

@Override
public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade)
throws IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
}

@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
}
}

0 comments on commit d3ce370

Please sign in to comment.