Skip to content

Commit

Permalink
add the code skeleton for mysql backend operating metalake
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Feb 4, 2024
1 parent 82e18e0 commit 92ca94b
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
Expand Down Expand Up @@ -232,6 +234,7 @@ public static ObjectMapper objectMapper() {
.configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
.build()
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
.registerModule(new JavaTimeModule());
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ dependencies {
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.commons.io)
implementation(libs.commons.dbcp2)
implementation(libs.caffeine)
implementation(libs.rocksdbjni)
implementation(libs.mybatis)
implementation(libs.bundles.metrics)
implementation(libs.bundles.prometheus)

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public interface Configs {
String DEFAULT_ENTITY_RELATION_STORE = "MySQLBackend";
String ENTITY_RELATION_STORE_KEY = "gravitino.entity.store.rel";

String MYSQL_ENTITY_STORE_URL_KEY = "gravitino.entity.store.mysql.url";
String MYSQL_ENTITY_STORE_DRIVER_NAME_KEY = "gravitino.entity.store.mysql.driverName";
String MYSQL_ENTITY_STORE_USERNAME_KEY = "gravitino.entity.store.mysql.username";
String MYSQL_ENTITY_STORE_PASSWORD_KEY = "gravitino.entity.store.mysql.password";

String ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY = "gravitino.entity.store.kv.rocksdbPath";

Long DEFAULT_KV_DELETE_AFTER_TIME = 604800000L; // 7 days
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,23 @@
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.storage.relation.RelationBackend;
import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessionFactoryHelper;
import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessions;
import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO;
import com.datastrato.gravitino.storage.relation.mysql.utils.POConverters;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ibatis.session.SqlSession;

/**
* {@link MySQLBackend} is a MySQL implementation of RelationBackend interface. If we want to use
Expand All @@ -26,46 +38,185 @@ public class MySQLBackend implements RelationBackend {
/** Initialize the MySQL backend instance. */
@Override
public void initialize(Config config) {
throw new UnsupportedOperationException("Unsupported operation now.");
SqlSessionFactoryHelper.getInstance().init(config);
}

@Override
public <E extends Entity & HasIdentifier> List<E> list(
Namespace namespace, Entity.EntityType entityType) throws NoSuchEntityException {
throw new UnsupportedOperationException("Unsupported operation now.");
Namespace namespace, Entity.EntityType entityType) {
try (SqlSession session = SqlSessions.getSqlSession()) {
switch (entityType) {
case METALAKE:
List<MetalakePO> metalakePOS =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.listMetalakePOs();
return (List<E>)
metalakePOS.stream()
.map(
metalakePO -> {
try {
return POConverters.fromMetalakePO(metalakePO);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for list operation", entityType));
}
} finally {
SqlSessions.closeSqlSession();
}
}

@Override
public boolean exists(NameIdentifier ident, Entity.EntityType entityType) {
throw new UnsupportedOperationException("Unsupported operation now.");
try (SqlSession session = SqlSessions.getSqlSession()) {
switch (entityType) {
case METALAKE:
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
return metalakePO != null;
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for exists operation", entityType));
}
} finally {
SqlSessions.closeSqlSession();
}
}

@Override
public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
throws EntityAlreadyExistsException {
throw new UnsupportedOperationException("Unsupported operation now.");
try (SqlSession session = SqlSessions.getSqlSession()) {
try {
if (e instanceof BaseMetalake) {
MetalakePO metalakePO = POConverters.toMetalakePO((BaseMetalake) e);
if (overwritten) {
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.insertMetalakeMetaWithUpdate(metalakePO);
} else {
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.insertMetalakeMeta(metalakePO);
}
SqlSessions.commitAndCloseSqlSession();
} else {
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for put operation", e.getClass()));
}
} catch (Throwable t) {
SqlSessions.rollbackAndCloseSqlSession();
throw new RuntimeException(t);
}
}
}

@Override
public <E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Entity.EntityType entityType, Function<E, E> updater)
throws NoSuchEntityException {
throw new UnsupportedOperationException("Unsupported operation now.");
throws NoSuchEntityException, AlreadyExistsException {
try (SqlSession session = SqlSessions.getSqlSession()) {
try {
switch (entityType) {
case METALAKE:
BaseMetalake oldMetalakeEntity =
POConverters.fromMetalakePO(
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name()));
BaseMetalake newMetalakeEntity = (BaseMetalake) updater.apply((E) oldMetalakeEntity);
Preconditions.checkArgument(
Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()),
String.format(
"The updated metalake entity id: %s is not same with the metalake entity id before: %s",
newMetalakeEntity.id(), oldMetalakeEntity.id()));
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.updateMetalakeMeta(POConverters.toMetalakePO(newMetalakeEntity));
SqlSessions.commitAndCloseSqlSession();
return (E) newMetalakeEntity;
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for update operation", entityType));
}
} catch (Throwable t) {
SqlSessions.rollbackAndCloseSqlSession();
throw new RuntimeException(t);
}
}
}

@Override
public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType) throws IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
NameIdentifier ident, Entity.EntityType entityType)
throws NoSuchEntityException, IOException {
try (SqlSession session = SqlSessions.getSqlSession()) {
switch (entityType) {
case METALAKE:
MetalakePO metalakePO =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
return (E) POConverters.fromMetalakePO(metalakePO);
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for get operation", entityType));
}
} finally {
SqlSessions.closeSqlSession();
}
}

@Override
public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) {
throw new UnsupportedOperationException("Unsupported operation now.");
try (SqlSession session = SqlSessions.getSqlSession()) {
try {
switch (entityType) {
case METALAKE:
Long metalakeId =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeIdMetaByName(ident.name());
if (metalakeId != null) {
// delete metalake
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.deleteMetalakeMetaById(metalakeId);
if (cascade) {
// TODO We will cascade delete the metadata of sub-resources under metalake
}
SqlSessions.commitAndCloseSqlSession();
}
return true;
case CATALOG:
case SCHEMA:
case TABLE:
case FILESET:
default:
throw new IllegalArgumentException(
String.format("Unsupported entity type: %s for delete operation", entityType));
}
} catch (Throwable t) {
SqlSessions.rollbackAndCloseSqlSession();
throw new RuntimeException(t);
}
}
}

@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Unsupported operation now.");
}
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relation.mysql.mapper;

import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

public interface MetalakeMetaMapper {
String TABLE_NAME = "metalake_meta";

@Select(
"SELECT id, metalake_name as metalakeName, metalake_comment as metalakeComment,"
+ " properties, audit_info as auditInfo, schema_version as schemaVersion"
+ " FROM "
+ TABLE_NAME)
List<MetalakePO> listMetalakePOs();

@Select(
"SELECT id, metalake_name as metalakeName,"
+ " metalake_comment as metalakeComment, properties,"
+ " audit_info as auditInfo, schema_version as schemaVersion"
+ " FROM "
+ TABLE_NAME
+ " WHERE metalake_name = #{metalakeName}")
MetalakePO selectMetalakeMetaByName(@Param("metalakeName") String name);

@Select("SELECT id FROM " + TABLE_NAME + " WHERE metalake_name = #{metalakeName}")
Long selectMetalakeIdMetaByName(@Param("metalakeName") String name);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(id, metalake_name, metalake_comment, properties, audit_info, schema_version)"
+ " VALUES("
+ " #{metalakeMeta.id},"
+ " #{metalakeMeta.metalakeName},"
+ " #{metalakeMeta.metalakeComment},"
+ " #{metalakeMeta.properties},"
+ " #{metalakeMeta.auditInfo},"
+ " #{metalakeMeta.schemaVersion}"
+ " )")
void insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(id, metalake_name, metalake_comment, properties, audit_info, schema_version)"
+ " VALUES("
+ " #{metalakeMeta.id},"
+ " #{metalakeMeta.metalakeName},"
+ " #{metalakeMeta.metalakeComment},"
+ " #{metalakeMeta.properties},"
+ " #{metalakeMeta.auditInfo},"
+ " #{metalakeMeta.schemaVersion}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " metalake_name = #{metalakeMeta.metalakeName},"
+ " metalake_comment = #{metalakeMeta.metalakeComment},"
+ " properties = #{metalakeMeta.properties},"
+ " audit_info = #{metalakeMeta.auditInfo},"
+ " schema_version = #{metalakeMeta.schemaVersion}")
void insertMetalakeMetaWithUpdate(@Param("metalakeMeta") MetalakePO metalakePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET metalake_name = #{metalakeMeta.metalakeName},"
+ " metalake_comment = #{metalakeMeta.metalakeComment},"
+ " properties = #{metalakeMeta.properties},"
+ " audit_info = #{metalakeMeta.auditInfo},"
+ " schema_version = #{metalakeMeta.schemaVersion}"
+ " WHERE id = #{metalakeMeta.id}")
void updateMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO);

@Delete("DELETE FROM " + TABLE_NAME + " WHERE id = #{id}")
Integer deleteMetalakeMetaById(@Param("id") Long id);
}
Loading

0 comments on commit 92ca94b

Please sign in to comment.