Skip to content

Commit

Permalink
extract common code
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Mar 1, 2024
1 parent 355b1f9 commit 385128b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper;
import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import com.datastrato.gravitino.storage.relational.utils.POConverters;
import com.datastrato.gravitino.storage.relational.utils.SessionUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import com.datastrato.gravitino.exceptions.NonEmptyEntityException;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper;
import com.datastrato.gravitino.storage.relational.po.SchemaPO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper;
import com.datastrato.gravitino.storage.relational.po.TablePO;
import com.datastrato.gravitino.storage.relational.utils.POConverters;
Expand All @@ -35,54 +32,49 @@ public static TableMetaService getInstance() {

private TableMetaService() {}

public TableEntity getTableByIdentifier(NameIdentifier identifier) {
NameIdentifier.checkTable(identifier);
String metalakeName = identifier.namespace().level(0);
String catalogName = identifier.namespace().level(1);
String schemaName = identifier.namespace().level(2);
Long metalakeId =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName));
if (metalakeId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.METALAKE.name().toLowerCase(),
metalakeName);
}

Long catalogId =
public TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName) {
TablePO tablePO =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName));
if (catalogId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.CATALOG.name().toLowerCase(),
String.format("%s.%s", metalakeName, catalogName));
}
TableMetaMapper.class,
mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId, tableName));

Long schemaId =
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, schemaName));
if (schemaId == null) {
if (tablePO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
identifier.namespace().toString());
Entity.EntityType.TABLE.name().toLowerCase(),
tableName);
}
return tablePO;
}

TablePO tablePO =
public Long getTableIdBySchemaIdAndName(Long schemaId, String tableName) {
Long tableId =
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId, identifier.name()));
mapper -> mapper.selectTableIdBySchemaIdAndName(schemaId, tableName));

if (tablePO == null) {
if (tableId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.TABLE.name().toLowerCase(),
identifier.toString());
tableName);
}
return tableId;
}

public TableEntity getTableByIdentifier(NameIdentifier identifier) {
NameIdentifier.checkTable(identifier);
String metalakeName = identifier.namespace().level(0);
String catalogName = identifier.namespace().level(1);
String schemaName = identifier.namespace().level(2);

Long metalakeId = MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
Long catalogId =
CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, catalogName);
Long schemaId =
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, schemaName);

TablePO tablePO = getTablePOBySchemaIdAndName(schemaId, identifier.name());

return POConverters.fromTablePO(tablePO, identifier.namespace());
}
Expand All @@ -93,41 +85,16 @@ public List<TableEntity> listTablesByNamespace(Namespace namespace) {
String catalogName = namespace.level(1);
String schemaName = namespace.level(2);

Long metalakeId =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName));
if (metalakeId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.METALAKE.name().toLowerCase(),
metalakeName);
}

Long metalakeId = MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
Long catalogId =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName));
if (catalogId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.CATALOG.name().toLowerCase(),
String.format("%s.%s", metalakeName, catalogName));
}

CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, catalogName);
Long schemaId =
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, schemaName));
if (schemaId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
namespace.toString());
}
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, schemaName);

List<TablePO> tablePOs =
SessionUtils.getWithoutCommit(
TableMetaMapper.class, mapper -> mapper.listTablePOsBySchemaId(schemaId));

return POConverters.fromTablePOs(tablePOs, namespace);
}

Expand All @@ -137,37 +104,12 @@ public void insertTable(TableEntity tableEntity, boolean overwrite) {
String metalakeName = tableEntity.namespace().level(0);
String catalogName = tableEntity.namespace().level(1);
String schemaName = tableEntity.namespace().level(2);
Long metalakeId =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName));
if (metalakeId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.METALAKE.name().toLowerCase(),
metalakeName);
}

Long metalakeId = MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
Long catalogId =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName));
if (catalogId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.CATALOG.name().toLowerCase(),
String.format("%s.%s", metalakeName, catalogName));
}

CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, catalogName);
Long schemaId =
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, schemaName));
if (schemaId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
tableEntity.namespace().toString());
}
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, schemaName);

SessionUtils.doWithCommit(
TableMetaMapper.class,
Expand Down Expand Up @@ -204,56 +146,21 @@ public <E extends Entity & HasIdentifier> TableEntity updateTable(
String schemaName = identifier.namespace().level(2);
String tableName = identifier.name();

Long metalakeId =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName));
if (metalakeId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.METALAKE.name().toLowerCase(),
metalakeName);
}

Long metalakeId = MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
Long catalogId =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName));
if (catalogId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.CATALOG.name().toLowerCase(),
String.format("%s.%s", metalakeName, catalogName));
}

CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, catalogName);
Long schemaId =
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, schemaName));
if (schemaId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
identifier.namespace().toString());
}

TablePO oldTablePO =
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId, tableName));
if (oldTablePO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.TABLE.name().toLowerCase(),
identifier.toString());
}
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, schemaName);

TablePO oldTablePO = getTablePOBySchemaIdAndName(schemaId, tableName);
TableEntity oldTableEntity = POConverters.fromTablePO(oldTablePO, identifier.namespace());
TableEntity newEntity = (TableEntity) updater.apply((E) oldTableEntity);
Preconditions.checkArgument(
Objects.equals(oldTableEntity.id(), newEntity.id()),
"The updated table entity id: %s should be same with the table entity id before: %s",
newEntity.id(),
oldTableEntity.id());

Integer updateResult;
try {
updateResult =
Expand Down Expand Up @@ -291,51 +198,17 @@ public boolean deleteTable(NameIdentifier identifier) {
String catalogName = identifier.namespace().level(1);
String schemaName = identifier.namespace().level(2);
String tableName = identifier.name();
Long metalakeId =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName));
if (metalakeId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.METALAKE.name().toLowerCase(),
metalakeName);
}

Long metalakeId = MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
Long catalogId =
SessionUtils.getWithoutCommit(
CatalogMetaMapper.class,
mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName));
if (catalogId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.CATALOG.name().toLowerCase(),
String.format("%s.%s", metalakeName, catalogName));
}

CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, catalogName);
Long schemaId =
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId, schemaName));
if (schemaId == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
identifier.namespace().toString());
}
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, schemaName);
Long tableId = getTableIdBySchemaIdAndName(schemaId, tableName);

SessionUtils.doWithCommit(
TableMetaMapper.class, mapper -> mapper.softDeleteTableMetasByTableId(tableId));

Long tableId =
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.selectTableIdBySchemaIdAndName(schemaId, tableName));
if (tableId != null) {
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
TableMetaMapper.class, mapper -> mapper.softDeleteTableMetasByTableId(tableId)),
() -> {
// TODO We will delete the sub-resources under the table
});
}
return true;
}
}

0 comments on commit 385128b

Please sign in to comment.