Skip to content

Commit

Permalink
[#4018] feat(core): Add tag management logic for Tag System (Part 1) (#…
Browse files Browse the repository at this point in the history
…4019)

### What changes were proposed in this pull request?

This PR tracks the work of adding the core logics for tag management.

### Why are the changes needed?

This is a part of work for adding tag support in Gravitino.

Fix: #4018 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UTs added.
  • Loading branch information
jerryshao authored Jul 9, 2024
1 parent 27ca875 commit 178eb37
Show file tree
Hide file tree
Showing 21 changed files with 1,609 additions and 51 deletions.
16 changes: 0 additions & 16 deletions core/src/main/java/com/datastrato/gravitino/meta/TagEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.Field;
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.MetadataObject;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.tag.Tag;
import com.google.common.collect.Maps;
Expand All @@ -47,10 +46,6 @@ public class TagEntity implements Tag, Entity, Auditable, HasIdentifier {
public static final Field PROPERTIES =
Field.optional("properties", Map.class, "The properties of the tag entity.");

public static final Field ASSOCIATED_OBJECTS =
Field.optional(
"objects", MetadataObject[].class, "The associated objects of the tag entity.");

public static final Field AUDIT_INFO =
Field.required("audit_info", Audit.class, "The audit details of the tag entity.");

Expand All @@ -59,7 +54,6 @@ public class TagEntity implements Tag, Entity, Auditable, HasIdentifier {
private Namespace namespace;
private String comment;
private Map<String, String> properties;
private MetadataObject[] objects = null;
private Audit auditInfo;

private TagEntity() {}
Expand All @@ -72,7 +66,6 @@ public Map<Field, Object> fields() {
fields.put(COMMENT, comment);
fields.put(PROPERTIES, properties);
fields.put(AUDIT_INFO, auditInfo);
fields.put(ASSOCIATED_OBJECTS, objects);

return Collections.unmodifiableMap(fields);
}
Expand Down Expand Up @@ -112,10 +105,6 @@ public Optional<Boolean> inherited() {
return Optional.empty();
}

public MetadataObject[] objects() {
return objects;
}

@Override
public Audit auditInfo() {
return auditInfo;
Expand Down Expand Up @@ -181,11 +170,6 @@ public Builder withProperties(Map<String, String> properties) {
return this;
}

public Builder withMetadataObjects(MetadataObject[] objects) {
tagEntity.objects = objects;
return this;
}

public Builder withAuditInfo(Audit auditInfo) {
tagEntity.auditInfo = auditInfo;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.datastrato.gravitino.meta.RoleEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.meta.TagEntity;
import com.datastrato.gravitino.meta.TopicEntity;
import com.datastrato.gravitino.meta.UserEntity;
import com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory;
Expand All @@ -48,6 +49,7 @@
import com.datastrato.gravitino.storage.relational.service.RoleMetaService;
import com.datastrato.gravitino.storage.relational.service.SchemaMetaService;
import com.datastrato.gravitino.storage.relational.service.TableMetaService;
import com.datastrato.gravitino.storage.relational.service.TagMetaService;
import com.datastrato.gravitino.storage.relational.service.TopicMetaService;
import com.datastrato.gravitino.storage.relational.service.UserMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
Expand Down Expand Up @@ -96,6 +98,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
return (List<E>) FilesetMetaService.getInstance().listFilesetsByNamespace(namespace);
case TOPIC:
return (List<E>) TopicMetaService.getInstance().listTopicsByNamespace(namespace);
case TAG:
return (List<E>) TagMetaService.getInstance().listTagsByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand Down Expand Up @@ -133,6 +137,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
RoleMetaService.getInstance().insertRole((RoleEntity) e, overwritten);
} else if (e instanceof GroupEntity) {
GroupMetaService.getInstance().insertGroup((GroupEntity) e, overwritten);
} else if (e instanceof TagEntity) {
TagMetaService.getInstance().insertTag((TagEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand Down Expand Up @@ -160,6 +166,8 @@ public <E extends Entity & HasIdentifier> E update(
return (E) UserMetaService.getInstance().updateUser(ident, updater);
case GROUP:
return (E) GroupMetaService.getInstance().updateGroup(ident, updater);
case TAG:
return (E) TagMetaService.getInstance().updateTag(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand Down Expand Up @@ -189,6 +197,8 @@ public <E extends Entity & HasIdentifier> E get(
return (E) GroupMetaService.getInstance().getGroupByIdentifier(ident);
case ROLE:
return (E) RoleMetaService.getInstance().getRoleByIdentifier(ident);
case TAG:
return (E) TagMetaService.getInstance().getTagByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand Down Expand Up @@ -217,6 +227,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
return GroupMetaService.getInstance().deleteGroup(ident);
case ROLE:
return RoleMetaService.getInstance().deleteRole(ident);
case TAG:
return TagMetaService.getInstance().deleteTag(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down Expand Up @@ -264,6 +276,9 @@ public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimelin
.deleteRoleMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case TAG:
return TagMetaService.getInstance()
.deleteTagMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case COLUMN:
case AUDIT:
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.TagPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

public interface TagMetaMapper {

String TAG_TABLE_NAME = "tag_meta";

@Select(
"SELECT tm.tag_id as tagId, tag_name as tagName,"
+ " tm.metalake_id as metalakeId,"
+ " tm.tag_comment as comment,"
+ " tm.properties as properties,"
+ " tm.audit_info as auditInfo,"
+ " tm.current_version as currentVersion,"
+ " tm.last_version as lastVersion,"
+ " tm.deleted_at as deletedAt"
+ " FROM "
+ TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm on tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.deleted_at = 0 AND mm.deleted_at = 0")
List<TagPO> listTagPOsByMetalake(@Param("metalakeName") String metalakeName);

@Select(
"SELECT tm.tag_id as tagId FROM "
+ TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm on tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = #{tagName}"
+ " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
Long selectTagIdByMetalakeAndName(
@Param("metalakeName") String metalakeName, @Param("tagName") String tagName);

@Select(
"SELECT tm.tag_id as tagId, tm.tag_name as tagName,"
+ " tm.metalake_id as metalakeId,"
+ " tm.tag_comment as comment,"
+ " tm.properties as properties,"
+ " tm.audit_info as auditInfo,"
+ " tm.current_version as currentVersion,"
+ " tm.last_version as lastVersion,"
+ " tm.deleted_at as deletedAt"
+ " FROM "
+ TAG_TABLE_NAME
+ " tm JOIN "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm on tm.metalake_id = mm.metalake_id"
+ " WHERE mm.metalake_name = #{metalakeName} AND tm.tag_name = #{tagName}"
+ " AND tm.deleted_at = 0 AND mm.deleted_at = 0")
TagPO selectTagMetaByMetalakeAndName(
@Param("metalakeName") String metalakeName, @Param("tagName") String tagName);

@Insert(
"INSERT INTO "
+ TAG_TABLE_NAME
+ " (tag_id, tag_name,"
+ " metalake_id, tag_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{tagMeta.tagId},"
+ " #{tagMeta.tagName},"
+ " #{tagMeta.metalakeId},"
+ " #{tagMeta.comment},"
+ " #{tagMeta.properties},"
+ " #{tagMeta.auditInfo},"
+ " #{tagMeta.currentVersion},"
+ " #{tagMeta.lastVersion},"
+ " #{tagMeta.deletedAt}"
+ " )")
void insertTagMeta(@Param("tagMeta") TagPO tagPO);

@Insert(
"INSERT INTO "
+ TAG_TABLE_NAME
+ "(tag_id, tag_name,"
+ " metalake_id, tag_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{tagMeta.tagId},"
+ " #{tagMeta.tagName},"
+ " #{tagMeta.metalakeId},"
+ " #{tagMeta.comment},"
+ " #{tagMeta.properties},"
+ " #{tagMeta.auditInfo},"
+ " #{tagMeta.currentVersion},"
+ " #{tagMeta.lastVersion},"
+ " #{tagMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " tag_name = #{tagMeta.tagName},"
+ " metalake_id = #{tagMeta.metalakeId},"
+ " tag_comment = #{tagMeta.comment},"
+ " properties = #{tagMeta.properties},"
+ " audit_info = #{tagMeta.auditInfo},"
+ " current_version = #{tagMeta.currentVersion},"
+ " last_version = #{tagMeta.lastVersion},"
+ " deleted_at = #{tagMeta.deletedAt}")
void insertTagMetaOnDuplicateKeyUpdate(@Param("tagMeta") TagPO tagPO);

@Update(
"UPDATE "
+ TAG_TABLE_NAME
+ " SET tag_name = #{newTagMeta.tagName},"
+ " tag_comment = #{newTagMeta.comment},"
+ " properties = #{newTagMeta.properties},"
+ " audit_info = #{newTagMeta.auditInfo},"
+ " current_version = #{newTagMeta.currentVersion},"
+ " last_version = #{newTagMeta.lastVersion},"
+ " deleted_at = #{newTagMeta.deletedAt}"
+ " WHERE tag_id = #{oldTagMeta.tagId}"
+ " AND metalake_id = #{oldTagMeta.metalakeId}"
+ " AND tag_name = #{oldTagMeta.tagName}"
+ " AND tag_comment = #{oldTagMeta.comment}"
+ " AND properties = #{oldTagMeta.properties}"
+ " AND audit_info = #{oldTagMeta.auditInfo}"
+ " AND current_version = #{oldTagMeta.currentVersion}"
+ " AND last_version = #{oldTagMeta.lastVersion}"
+ "AND deleted_at = 0")
Integer updateTagMeta(@Param("newTagMeta") TagPO newTagPO, @Param("oldTagMeta") TagPO oldTagPO);

@Update(
"UPDATE "
+ TAG_TABLE_NAME
+ " tm SET tm.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE tm.metalake_id IN ("
+ " SELECT mm.metalake_id FROM "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+ " AND tm.tag_name = #{tagName} AND tm.deleted_at = 0")
Integer softDeleteTagMetaByMetalakeAndTagName(
@Param("metalakeName") String metalakeName, @Param("tagName") String tagName);

@Update(
"UPDATE "
+ TAG_TABLE_NAME
+ " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
void softDeleteTagMetasByMetalakeId(@Param("metalakeId") Long metalakeId);

@Delete(
"DELETE FROM "
+ TAG_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}")
Integer deleteTagMetasByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datastrato.gravitino.storage.relational.mapper;

import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

public interface TagMetadataObjectRelMapper {
String TAG_METADATA_OBJECT_RELATION_TABLE_NAME = "tag_relation_meta";

@Update(
"UPDATE "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE tmo.tag_id IN (SELECT tm.tag_id FROM "
+ TagMetaMapper.TAG_TABLE_NAME
+ " tm WHERE tm.metalake_id IN (SELECT mm.metalake_id FROM "
+ MetalakeMetaMapper.TABLE_NAME
+ " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+ " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
Integer softDeleteTagMetadataObjectRelsByMetalakeAndTagName(
@Param("metalakeName") String metalakeName, @Param("tagName") String tagName);

@Update(
"UPDATE "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " tmo SET tmo.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ " WHERE EXISTS (SELECT * FROM "
+ TagMetaMapper.TAG_TABLE_NAME
+ " tm WHERE tm.metalake_id = #{metalakeId} AND tm.tag_id = tmo.tag_id"
+ " AND tm.deleted_at = 0) AND tmo.deleted_at = 0")
void softDeleteTagMetadataObjectRelsByMetalakeId(@Param("metalakeId") Long metalakeId);

@Delete(
"DELETE FROM "
+ TAG_METADATA_OBJECT_RELATION_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}")
Integer deleteTagEntityRelsByLegacyTimeline(
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
}
Loading

0 comments on commit 178eb37

Please sign in to comment.