Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiebao Xiao committed Feb 26, 2024
1 parent e5c25f2 commit 7760554
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.exceptions;
package com.datastrato.gravitino;

import com.datastrato.gravitino.exceptions.GravitinoRuntimeException;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational;

import com.datastrato.gravitino.Config;
Expand All @@ -11,20 +12,14 @@
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.UnsupportedEntityTypeException;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.exceptions.UnsupportedEntityTypeException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relational.po.MetalakePO;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import com.datastrato.gravitino.storage.relational.utils.POConverters;
import com.datastrato.gravitino.storage.relational.utils.SessionUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

/**
Expand All @@ -44,60 +39,30 @@ public void initialize(Config config) {
@Override
public <E extends Entity & HasIdentifier> List<E> list(
Namespace namespace, Entity.EntityType entityType) {
if (entityType == Entity.EntityType.METALAKE) {
List<MetalakePO> metalakePOS =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, MetalakeMetaMapper::listMetalakePOs);
return (List<E>) POConverters.fromMetalakePOs(metalakePOS);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
switch (entityType) {
case METALAKE:
return (List<E>) MetalakeMetaService.getInstance().listMetalakes();
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
}
}

@Override
public boolean exists(NameIdentifier ident, Entity.EntityType entityType) {
if (entityType == Entity.EntityType.METALAKE) {
MetalakePO metalakePO =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name()));
return metalakePO != null;
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for exists operation", entityType);
try {
Entity entity = get(ident, entityType);
return entity != null;
} catch (NoSuchEntityException ne) {
return false;
}
}

@Override
public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
throws EntityAlreadyExistsException {
if (e instanceof BaseMetalake) {
try {
SessionUtils.doWithCommit(
MetalakeMetaMapper.class,
mapper -> {
MetalakePO po =
POConverters.initializeMetalakePOVersion(
POConverters.toMetalakePO((BaseMetalake) e));
if (overwritten) {
mapper.insertMetalakeMetaOnDuplicateKeyUpdate(po);
} else {
mapper.insertMetalakeMeta(po);
}
});
} catch (RuntimeException re) {
if (re.getCause() != null
&& re.getCause().getCause() != null
&& re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) {
// TODO We should make more fine-grained exception judgments
// Usually throwing `SQLIntegrityConstraintViolationException` means that
// SQL violates the constraints of `primary key` and `unique key`.
// We simply think that the entity already exists at this time.
throw new EntityAlreadyExistsException(
String.format("Metalake entity: %s already exists", e.nameIdentifier().name()));
}
throw re;
}
MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -108,86 +73,35 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
public <E extends Entity & HasIdentifier> E update(
NameIdentifier ident, Entity.EntityType entityType, Function<E, E> updater)
throws IOException, NoSuchEntityException, AlreadyExistsException {
if (entityType == Entity.EntityType.METALAKE) {
MetalakePO oldMetalakePO =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name()));
if (oldMetalakePO == null) {
throw new NoSuchEntityException("No such entity:%s", ident.toString());
}

BaseMetalake oldMetalakeEntity = POConverters.fromMetalakePO(oldMetalakePO);
BaseMetalake newMetalakeEntity = (BaseMetalake) updater.apply((E) oldMetalakeEntity);
Preconditions.checkArgument(
Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()),
"The updated metalake entity id: %s should same with the metalake entity id before: %s",
newMetalakeEntity.id(),
oldMetalakeEntity.id());
MetalakePO newMetalakePO =
POConverters.updateMetalakePOVersion(
oldMetalakePO, POConverters.toMetalakePO(newMetalakeEntity));

Integer updateResult =
SessionUtils.doWithCommitAndFetchResult(
MetalakeMetaMapper.class,
mapper -> mapper.updateMetalakeMeta(newMetalakePO, oldMetalakePO));
if (updateResult > 0) {
return (E) newMetalakeEntity;
} else {
throw new IOException("Failed to update the entity:" + ident);
}
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
}
}

@Override
public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType)
throws NoSuchEntityException, IOException {
if (entityType == Entity.EntityType.METALAKE) {
MetalakePO metalakePO =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name()));
if (metalakePO == null) {
throw new NoSuchEntityException("No such entity:%s", ident.toString());
}
return (E) POConverters.fromMetalakePO(metalakePO);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
NameIdentifier ident, Entity.EntityType entityType) throws NoSuchEntityException {
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().getMetalakeByIdent(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
}
}

@Override
public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade) {
if (entityType == Entity.EntityType.METALAKE) {
Long metalakeId =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(ident.name()));
if (metalakeId != null) {
if (cascade) {
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
MetalakeMetaMapper.class,
mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId)),
() -> {
// TODO We will cascade delete the metadata of sub-resources under the metalake
});
} else {
// TODO Check whether the sub-resources are empty. If the sub-resources are not empty,
// deletion is not allowed.
SessionUtils.doWithCommit(
MetalakeMetaMapper.class,
mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId));
}
}
return true;
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
switch (entityType) {
case METALAKE:
return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.service;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityAlreadyExistsException;
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import com.datastrato.gravitino.storage.relational.po.MetalakePO;
import com.datastrato.gravitino.storage.relational.utils.POConverters;
import com.datastrato.gravitino.storage.relational.utils.SessionUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

public class MetalakeMetaService {
private static final MetalakeMetaService INSTANCE = new MetalakeMetaService();

public static MetalakeMetaService getInstance() {
return INSTANCE;
}

private MetalakeMetaService() {}

public List<BaseMetalake> listMetalakes() {
List<MetalakePO> metalakePOS =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, MetalakeMetaMapper::listMetalakePOs);
return POConverters.fromMetalakePOs(metalakePOS);
}

public BaseMetalake getMetalakeByIdent(NameIdentifier ident) {
MetalakePO metalakePO =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name()));
if (metalakePO == null) {
throw new NoSuchEntityException("No such entity: %s", ident.toString());
}
return POConverters.fromMetalakePO(metalakePO);
}

public void insertMetalake(BaseMetalake baseMetalake, boolean overwrite) {
try {
SessionUtils.doWithCommit(
MetalakeMetaMapper.class,
mapper -> {
MetalakePO po = POConverters.initializeMetalakePOWithVersion(baseMetalake);
if (overwrite) {
mapper.insertMetalakeMetaOnDuplicateKeyUpdate(po);
} else {
mapper.insertMetalakeMeta(po);
}
});
} catch (RuntimeException re) {
if (re.getCause() != null
&& re.getCause().getCause() != null
&& re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) {
// TODO We should make more fine-grained exception judgments
// Usually throwing `SQLIntegrityConstraintViolationException` means that
// SQL violates the constraints of `primary key` and `unique key`.
// We simply think that the entity already exists at this time.
throw new EntityAlreadyExistsException(
String.format(
"Metalake entity: %s already exists", baseMetalake.nameIdentifier().name()));
}
throw re;
}
}

public <E extends Entity & HasIdentifier> BaseMetalake updateMetalake(
NameIdentifier ident, Function<E, E> updater) throws IOException {
MetalakePO oldMetalakePO =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name()));
if (oldMetalakePO == null) {
throw new NoSuchEntityException("No such entity: %s", ident.toString());
}

BaseMetalake oldMetalakeEntity = POConverters.fromMetalakePO(oldMetalakePO);
BaseMetalake newMetalakeEntity = (BaseMetalake) updater.apply((E) oldMetalakeEntity);
Preconditions.checkArgument(
Objects.equals(oldMetalakeEntity.id(), newMetalakeEntity.id()),
"The updated metalake entity id: %s should be same with the metalake entity id before: %s",
newMetalakeEntity.id(),
oldMetalakeEntity.id());
MetalakePO newMetalakePO =
POConverters.updateMetalakePOWithVersion(oldMetalakePO, newMetalakeEntity);

Integer updateResult =
SessionUtils.doWithCommitAndFetchResult(
MetalakeMetaMapper.class,
mapper -> mapper.updateMetalakeMeta(newMetalakePO, oldMetalakePO));
if (updateResult > 0) {
return newMetalakeEntity;
} else {
throw new IOException("Failed to update the entity: " + ident);
}
}

public boolean deleteMetalake(NameIdentifier ident, boolean cascade) {
Long metalakeId =
SessionUtils.getWithoutCommit(
MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(ident.name()));
if (metalakeId != null) {
if (cascade) {
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
MetalakeMetaMapper.class,
mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId)),
() -> {
// TODO We will cascade delete the metadata of sub-resources under the metalake
});
} else {
// TODO Check whether the sub-resources are empty. If the sub-resources are not empty,
// deletion is not allowed.
SessionUtils.doWithCommit(
MetalakeMetaMapper.class,
mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId));
}
}
return true;
}
}
Loading

0 comments on commit 7760554

Please sign in to comment.