Skip to content

Commit

Permalink
add mysql backend ops for catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Feb 5, 2024
1 parent 3442dbe commit dd43519
Show file tree
Hide file tree
Showing 7 changed files with 528 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.storage.relation.RelationBackend;
import com.datastrato.gravitino.storage.relation.mysql.mapper.CatalogMetaMapper;
import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessionFactoryHelper;
import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessions;
import com.datastrato.gravitino.storage.relation.mysql.po.CatalogPO;
import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO;
import com.datastrato.gravitino.storage.relation.mysql.utils.POConverters;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ibatis.session.SqlSession;

/**
Expand All @@ -52,18 +53,18 @@ public <E extends Entity & HasIdentifier> List<E> list(
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.listMetalakePOs();
return metalakePOS != null
? metalakePOS.stream()
.map(
metalakePO -> {
try {
return (E) POConverters.fromMetalakePO(metalakePO);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList())
? (List<E>) POConverters.fromMetalakePOs(metalakePOS)
: new ArrayList<>();
case CATALOG:
Preconditions.checkArgument(
namespace.levels().length == 1,
"Catalog entity namespace should have only one level");
List<CatalogPO> catalogPOS =
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.listCatalogPOsByMetalakeId(getMetalakeIdByName(namespace.level(0)));
return catalogPOS != null
? (List<E>) POConverters.fromCatalogPOs(catalogPOS, namespace)
: new ArrayList<>();
case SCHEMA:
case TABLE:
case FILESET:
Expand All @@ -81,11 +82,22 @@ public boolean exists(NameIdentifier ident, Entity.EntityType entityType) {
try (SqlSession session = SqlSessions.getSqlSession()) {
switch (entityType) {
case METALAKE:
MetalakePO metalakePO =
Long metalakeId =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeMetaByName(ident.name());
return metalakePO != null;
.selectMetalakeIdMetaByName(ident.name());
return metalakeId != null;
case CATALOG:
Preconditions.checkArgument(
ident.hasNamespace() && ident.namespace().levels().length == 1,
"Catalog entity namespace should have only one level");
try {
Long catalogId =
getCatalogIdByNameAndMetalakeId(
ident.name(), getMetalakeIdByName(ident.namespace().level(0)));
return catalogId != null;
} catch (NoSuchEntityException e) {
return false;
}
case SCHEMA:
case TABLE:
case FILESET:
Expand All @@ -111,7 +123,6 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
throw new EntityAlreadyExistsException(
String.format("Metalake entity: %s already exists", e.nameIdentifier().name()));
}

if (overwritten) {
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.insertMetalakeMetaWithUpdate(POConverters.toMetalakePO((BaseMetalake) e));
Expand All @@ -120,6 +131,28 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
.insertMetalakeMeta(POConverters.toMetalakePO((BaseMetalake) e));
}
SqlSessions.commitAndCloseSqlSession();
} else if (e instanceof CatalogEntity) {
Preconditions.checkArgument(
e.nameIdentifier().hasNamespace()
&& e.nameIdentifier().namespace().levels().length == 1,
"Catalog entity namespace should have only one level");
Long metalakeId = getMetalakeIdByName(e.nameIdentifier().namespace().level(0));
CatalogPO catalogPO =
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.selectCatalogMetaByNameAndMetalakeId(e.nameIdentifier().name(), metalakeId);
if (!overwritten && catalogPO != null) {
throw new EntityAlreadyExistsException(
String.format("Catalog entity: %s already exists", e.nameIdentifier().name()));
}
if (overwritten) {
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.insertCatalogMetaWithUpdate(
POConverters.toCatalogPO((CatalogEntity) e, metalakeId));
} else {
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.insertCatalogMeta(POConverters.toCatalogPO((CatalogEntity) e, metalakeId));
}
SqlSessions.commitAndCloseSqlSession();
} else {
SqlSessions.closeSqlSession();
throw new IllegalArgumentException(
Expand Down Expand Up @@ -155,6 +188,26 @@ public <E extends Entity & HasIdentifier> E update(
SqlSessions.commitAndCloseSqlSession();
return (E) newMetalakeEntity;
case CATALOG:
Preconditions.checkArgument(
ident.hasNamespace() && ident.namespace().levels().length == 1,
"Catalog entity namespace should have only one level");
Long metalakeId = getMetalakeIdByName(ident.namespace().level(0));
CatalogPO oldCatalogPO =
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.selectCatalogMetaByNameAndMetalakeId(ident.name(), metalakeId);
CatalogEntity oldCatalogEntity =
POConverters.fromCatalogPO(oldCatalogPO, ident.namespace());
CatalogEntity newCatalogEntity = (CatalogEntity) updater.apply((E) oldCatalogEntity);
Preconditions.checkArgument(
Objects.equals(oldCatalogEntity.id(), newCatalogEntity.id()),
String.format(
"The updated catalog entity id: %s should same with the catalog entity id before: %s",
newCatalogEntity.id(), oldCatalogEntity.id()));
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.updateCatalogMeta(
POConverters.toCatalogPO(newCatalogEntity, metalakeId), oldCatalogPO);
SqlSessions.commitAndCloseSqlSession();
return (E) newCatalogEntity;
case SCHEMA:
case TABLE:
case FILESET:
Expand Down Expand Up @@ -184,6 +237,17 @@ public <E extends Entity & HasIdentifier> E get(
}
return (E) POConverters.fromMetalakePO(metalakePO);
case CATALOG:
Preconditions.checkArgument(
ident.hasNamespace() && ident.namespace().levels().length == 1,
"Catalog entity namespace should have only one level");
CatalogPO catalogPO =
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.selectCatalogMetaByNameAndMetalakeId(
ident.name(), getMetalakeIdByName(ident.namespace().level(0)));
if (catalogPO == null) {
return null;
}
return (E) POConverters.fromCatalogPO(catalogPO, ident.namespace());
case SCHEMA:
case TABLE:
case FILESET:
Expand All @@ -202,20 +266,32 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
try {
switch (entityType) {
case METALAKE:
Long metalakeId =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeIdMetaByName(ident.name());
if (metalakeId != null) {
// delete metalake
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.deleteMetalakeMetaById(metalakeId);
if (cascade) {
// TODO We will cascade delete the metadata of sub-resources under the metalake
}
SqlSessions.commitAndCloseSqlSession();
Long metalakeId = getMetalakeIdByName(ident.name());
// delete metalake
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.deleteMetalakeMetaById(metalakeId);
if (cascade) {
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.deleteCatalogMetasByMetalakeId(metalakeId);
// TODO delete schema, table, fileset
}
SqlSessions.commitAndCloseSqlSession();
return true;
case CATALOG:
Preconditions.checkArgument(
ident.hasNamespace() && ident.namespace().levels().length == 1,
"Catalog entity namespace should have only one level");
Long catalogId =
getCatalogIdByNameAndMetalakeId(
ident.name(), getMetalakeIdByName(ident.namespace().level(0)));
// delete catalog
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.deleteCatalogMetasById(catalogId);
if (cascade) {
// TODO delete schema, table, fileset
}
SqlSessions.commitAndCloseSqlSession();
return true;
case SCHEMA:
case TABLE:
case FILESET:
Expand All @@ -230,6 +306,26 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
}
}

private Long getMetalakeIdByName(String name) {
Long metalakeId =
((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class))
.selectMetalakeIdMetaByName(name);
if (metalakeId == null) {
throw new NoSuchEntityException(String.format("Metalake entity: %s not exists", name));
}
return metalakeId;
}

private Long getCatalogIdByNameAndMetalakeId(String name, Long metalakeId) {
Long catalogId =
((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class))
.selectCatalogIdByNameAndMetalakeId(name, metalakeId);
if (catalogId == null) {
throw new NoSuchEntityException(String.format("Catalog entity: %s not exists", name));
}
return catalogId;
}

@Override
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relation.mysql.mapper;

import com.datastrato.gravitino.storage.relation.mysql.po.CatalogPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

public interface CatalogMetaMapper {
String TABLE_NAME = "catalog_meta";

@Select(
"SELECT id, catalog_name as catalogName, metalake_id as metalakeId,"
+ " type, provider, catalog_comment as catalogComment,"
+ " properties, audit_info as auditInfo"
+ " FROM "
+ TABLE_NAME)
List<CatalogPO> listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId);

@Select(
"SELECT id FROM "
+ TABLE_NAME
+ " WHERE catalog_name = #{catalogName} and metalake_id = #{metalakeId}")
Long selectCatalogIdByNameAndMetalakeId(
@Param("catalogName") String name, @Param("metalakeId") Long metalakeId);

@Select(
"SELECT id, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
+ " catalog_comment as catalogComment, properties,"
+ " audit_info as auditInfo"
+ " FROM "
+ TABLE_NAME
+ " WHERE catalog_name = #{catalogName} and metalake_id = #{metalakeId}")
CatalogPO selectCatalogMetaByNameAndMetalakeId(
@Param("catalogName") String name, @Param("metalakeId") Long metalakeId);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(id, catalog_name, metalake_id, type, provider, catalog_comment, properties, audit_info)"
+ " VALUES("
+ " #{catalogMeta.id},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo}"
+ " )")
void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(id, catalog_name, metalake_id, type, provider, metalake_comment, properties, audit_info)"
+ " VALUES("
+ " #{catalogMeta.id},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " catalog_name = #{catalogMeta.catalogName},"
+ " metalake_id = #{catalogMeta.metalakeId},"
+ " type = #{catalogMeta.type},"
+ " provider = #{catalogMeta.provider},"
+ " catalog_comment = #{catalogMeta.catalogComment},"
+ " properties = #{catalogMeta.properties},"
+ " audit_info = #{catalogMeta.auditInfo}")
void insertCatalogMetaWithUpdate(@Param("catalogMeta") CatalogPO catalogPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET catalog_name = #{newCatalogMeta.metalakeName},"
+ " metalake_id = #{newCatalogMeta.metalakeId},"
+ " type = #{newCatalogMeta.type},"
+ " provider = #{newCatalogMeta.provider},"
+ " catalog_comment = #{newCatalogMeta.catalogComment},"
+ " properties = #{newCatalogMeta.properties},"
+ " audit_info = #{newCatalogMeta.auditInfo}"
+ " WHERE id = #{oldCatalogMeta.id}"
+ " and catalog_name = #{oldCatalogMeta.catalogName}"
+ " and metalake_id = #{oldCatalogMeta.metalakeId}"
+ " and type = #{oldCatalogMeta.type}"
+ " and provider = #{oldCatalogMeta.provider}"
+ " and catalog_comment = #{oldCatalogMeta.catalogComment}"
+ " and properties = #{oldCatalogMeta.properties}"
+ " and audit_info = #{oldCatalogMeta.auditInfo}")
void updateCatalogMeta(
@Param("newCatalogMeta") CatalogPO newCatalogPO,
@Param("oldCatalogMeta") CatalogPO oldCatalogPO);

@Delete("DELETE FROM " + TABLE_NAME + " WHERE id = #{id}")
Integer deleteCatalogMetasById(@Param("id") Long id);

@Delete("DELETE FROM " + TABLE_NAME + " WHERE metalake_id = #{metalakeId}")
Integer deleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.storage.relation.mysql.mapper.CatalogMetaMapper;
import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper;
import com.google.common.base.Preconditions;
import java.time.Duration;
Expand Down Expand Up @@ -68,6 +69,7 @@ public void init(Config config) {
Environment environment = new Environment("development", transactionFactory, dataSource);
Configuration configuration = new Configuration(environment);
configuration.addMapper(MetalakeMetaMapper.class);
configuration.addMapper(CatalogMetaMapper.class);
if (sqlSessionFactory == null) {
synchronized (SqlSessionFactoryHelper.class) {
if (sqlSessionFactory == null) {
Expand Down
Loading

0 comments on commit dd43519

Please sign in to comment.