Skip to content

Commit

Permalink
enhance schema job module (#1729)
Browse files Browse the repository at this point in the history
Change-Id: I32938d4a5ed595ceb0f2dc02eb7ee75141f4133a
  • Loading branch information
javeme authored Jan 12, 2022
1 parent 6924f36 commit 60672b2
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.exception.NotAllowException;
import com.baidu.hugegraph.job.JobBuilder;
import com.baidu.hugegraph.job.schema.EdgeLabelRemoveCallable;
import com.baidu.hugegraph.job.schema.IndexLabelRemoveCallable;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyClearCallable;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyCreateCallable;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyRemoveCallable;
import com.baidu.hugegraph.job.schema.RebuildIndexCallable;
import com.baidu.hugegraph.job.schema.SchemaCallable;
import com.baidu.hugegraph.job.schema.VertexLabelRemoveCallable;
import com.baidu.hugegraph.job.schema.EdgeLabelRemoveJob;
import com.baidu.hugegraph.job.schema.IndexLabelRebuildJob;
import com.baidu.hugegraph.job.schema.IndexLabelRemoveJob;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyClearJob;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyCreateJob;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyRemoveJob;
import com.baidu.hugegraph.job.schema.SchemaJob;
import com.baidu.hugegraph.job.schema.VertexLabelRemoveJob;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
Expand Down Expand Up @@ -200,7 +200,7 @@ public VertexLabel getVertexLabel(String name) {
@Watched(prefix = "schema")
public Id removeVertexLabel(Id id) {
LOG.debug("SchemaTransaction remove vertex label '{}'", id);
SchemaCallable callable = new VertexLabelRemoveCallable();
SchemaJob callable = new VertexLabelRemoveJob();
VertexLabel schema = this.getVertexLabel(id);
return asyncRun(this.graph(), schema, callable);
}
Expand All @@ -226,7 +226,7 @@ public EdgeLabel getEdgeLabel(String name) {
@Watched(prefix = "schema")
public Id removeEdgeLabel(Id id) {
LOG.debug("SchemaTransaction remove edge label '{}'", id);
SchemaCallable callable = new EdgeLabelRemoveCallable();
SchemaJob callable = new EdgeLabelRemoveJob();
EdgeLabel schema = this.getEdgeLabel(id);
return asyncRun(this.graph(), schema, callable);
}
Expand Down Expand Up @@ -262,7 +262,7 @@ public IndexLabel getIndexLabel(String name) {
@Watched(prefix = "schema")
public Id removeIndexLabel(Id id) {
LOG.debug("SchemaTransaction remove index label '{}'", id);
SchemaCallable callable = new IndexLabelRemoveCallable();
SchemaJob callable = new IndexLabelRemoveJob();
IndexLabel schema = this.getIndexLabel(id);
return asyncRun(this.graph(), schema, callable);
}
Expand All @@ -276,7 +276,7 @@ public Id rebuildIndex(SchemaElement schema) {
public Id rebuildIndex(SchemaElement schema, Set<Id> dependencies) {
LOG.debug("SchemaTransaction rebuild index for {} with id '{}'",
schema.type(), schema.id());
SchemaCallable callable = new RebuildIndexCallable();
SchemaJob callable = new IndexLabelRebuildJob();
return asyncRun(this.graph(), schema, callable, dependencies);
}

Expand Down Expand Up @@ -305,21 +305,21 @@ public void createIndexLabelForOlapPk(PropertyKey propertyKey) {
public Id createOlapPk(PropertyKey propertyKey) {
LOG.debug("SchemaTransaction create olap property key {} with id '{}'",
propertyKey.name(), propertyKey.id());
SchemaCallable callable = new OlapPropertyKeyCreateCallable();
SchemaJob callable = new OlapPropertyKeyCreateJob();
return asyncRun(this.graph(), propertyKey, callable);
}

public Id clearOlapPk(PropertyKey propertyKey) {
LOG.debug("SchemaTransaction clear olap property key {} with id '{}'",
propertyKey.name(), propertyKey.id());
SchemaCallable callable = new OlapPropertyKeyClearCallable();
SchemaJob callable = new OlapPropertyKeyClearJob();
return asyncRun(this.graph(), propertyKey, callable);
}

public Id removeOlapPk(PropertyKey propertyKey) {
LOG.debug("SchemaTransaction remove olap property key {} with id '{}'",
propertyKey.name(), propertyKey.id());
SchemaCallable callable = new OlapPropertyKeyRemoveCallable();
SchemaJob callable = new OlapPropertyKeyRemoveJob();
return asyncRun(this.graph(), propertyKey, callable);
}

Expand Down Expand Up @@ -543,17 +543,17 @@ private static void setCreateTimeIfNeeded(SchemaElement schema) {
}

private static Id asyncRun(HugeGraph graph, SchemaElement schema,
SchemaCallable callable) {
SchemaJob callable) {
return asyncRun(graph, schema, callable, ImmutableSet.of());
}

@Watched(prefix = "schema")
private static Id asyncRun(HugeGraph graph, SchemaElement schema,
SchemaCallable callable, Set<Id> dependencies) {
SchemaJob callable, Set<Id> dependencies) {
E.checkArgument(schema != null, "Schema can't be null");
String name = SchemaCallable.formatTaskName(schema.type(),
schema.id(),
schema.name());
String name = SchemaJob.formatTaskName(schema.type(),
schema.id(),
schema.name());

JobBuilder<Object> builder = JobBuilder.of(graph).name(name)
.job(callable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.baidu.hugegraph.util.LockUtil;
import com.google.common.collect.ImmutableSet;

public class EdgeLabelRemoveCallable extends SchemaCallable {
public class EdgeLabelRemoveJob extends SchemaJob {

@Override
public String type() {
Expand All @@ -43,15 +43,19 @@ public Object execute() {
return null;
}

protected static void removeEdgeLabel(HugeGraphParams graph, Id id) {
private static void removeEdgeLabel(HugeGraphParams graph, Id id) {
GraphTransaction graphTx = graph.graphTransaction();
SchemaTransaction schemaTx = graph.schemaTransaction();
EdgeLabel edgeLabel = schemaTx.getEdgeLabel(id);
// If the edge label does not exist, return directly
if (edgeLabel == null) {
return;
}
// TODO: use event to replace direct call
if (edgeLabel.status().deleting()) {
LOG.info("The edge label '{}' has been in {} status, " +
"please check if it's expected to delete it again",
edgeLabel, edgeLabel.status());
}
// Remove index related data(include schema) of this edge label
Set<Id> indexIds = ImmutableSet.copyOf(edgeLabel.indexLabels());
LockUtil.Locks locks = new LockUtil.Locks(graph.name());
Expand All @@ -60,16 +64,18 @@ protected static void removeEdgeLabel(HugeGraphParams graph, Id id) {
schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.DELETING);
try {
for (Id indexId : indexIds) {
IndexLabelRemoveCallable.removeIndexLabel(graph, indexId);
IndexLabelRemoveJob.removeIndexLabel(graph, indexId);
}
// Remove all edges which has matched label
// TODO: use event to replace direct call
graphTx.removeEdges(edgeLabel);
removeSchema(schemaTx, edgeLabel);
/*
* Should commit changes to backend store before release
* delete lock
*/
graph.graph().tx().commit();
// Remove edge label
removeSchema(schemaTx, edgeLabel);
} catch (Throwable e) {
schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.UNDELETED);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import com.baidu.hugegraph.util.LockUtil;
import com.google.common.collect.ImmutableSet;

public class RebuildIndexCallable extends SchemaCallable {
public class IndexLabelRebuildJob extends SchemaJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.baidu.hugegraph.type.define.SchemaStatus;
import com.baidu.hugegraph.util.LockUtil;

public class IndexLabelRemoveCallable extends SchemaCallable {
public class IndexLabelRemoveJob extends SchemaJob {

@Override
public String type() {
Expand All @@ -48,6 +48,11 @@ protected static void removeIndexLabel(HugeGraphParams graph, Id id) {
if (indexLabel == null) {
return;
}
if (indexLabel.status().deleting()) {
LOG.info("The index label '{}' has been in {} status, " +
"please check if it's expected to delete it again",
indexLabel, indexLabel.status());
}
LockUtil.Locks locks = new LockUtil.Locks(graph.name());
try {
locks.lockWrites(LockUtil.INDEX_LABEL_DELETE, id);
Expand All @@ -60,14 +65,15 @@ protected static void removeIndexLabel(HugeGraphParams graph, Id id) {
// Remove index data
// TODO: use event to replace direct call
graphTx.removeIndex(indexLabel);
removeSchema(schemaTx, indexLabel);
/*
* Should commit changes to backend store before release
* delete lock
*/
graph.graph().tx().commit();
// Remove index label
removeSchema(schemaTx, indexLabel);
} catch (Throwable e) {
schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.INVALID);
schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.UNDELETED);
throw e;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.baidu.hugegraph.type.define.SchemaStatus;
import com.baidu.hugegraph.util.LockUtil;

public class OlapPropertyKeyClearCallable extends IndexLabelRemoveCallable {
public class OlapPropertyKeyClearJob extends IndexLabelRemoveJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
import com.baidu.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyCreateCallable extends SchemaCallable {
public class OlapPropertyKeyCreateJob extends SchemaJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
import com.baidu.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyRemoveCallable
extends OlapPropertyKeyClearCallable {
public class OlapPropertyKeyRemoveJob extends OlapPropertyKeyClearJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import org.slf4j.Logger;

import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
Expand All @@ -13,8 +15,9 @@
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;

public abstract class SchemaCallable extends SysJob<Object> {
public abstract class SchemaJob extends SysJob<Object> {

public static final String REMOVE_SCHEMA = "remove_schema";
public static final String REBUILD_INDEX = "rebuild_index";
Expand All @@ -23,6 +26,8 @@ public abstract class SchemaCallable extends SysJob<Object> {
public static final String CLEAR_OLAP = "clear_olap";
public static final String REMOVE_OLAP = "remove_olap";

protected static final Logger LOG = Log.logger(SchemaJob.class);

private static final String SPLITOR = ":";

protected HugeType schemaType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import com.baidu.hugegraph.util.LockUtil;
import com.google.common.collect.ImmutableSet;

public class VertexLabelRemoveCallable extends SchemaCallable {
public class VertexLabelRemoveJob extends SchemaJob {

@Override
public String type() {
Expand All @@ -54,7 +54,13 @@ private static void removeVertexLabel(HugeGraphParams graph, Id id) {
if (vertexLabel == null) {
return;
}
if (vertexLabel.status().deleting()) {
LOG.info("The vertex label '{}' has been in {} status, " +
"please check if it's expected to delete it again",
vertexLabel, vertexLabel.status());
}

// Check no edge label use the vertex label
List<EdgeLabel> edgeLabels = schemaTx.getEdgeLabels();
for (EdgeLabel edgeLabel : edgeLabels) {
if (edgeLabel.linkWithLabel(id)) {
Expand All @@ -76,19 +82,21 @@ private static void removeVertexLabel(HugeGraphParams graph, Id id) {
schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.DELETING);
try {
for (Id ilId : indexLabelIds) {
IndexLabelRemoveCallable.removeIndexLabel(graph, ilId);
IndexLabelRemoveJob.removeIndexLabel(graph, ilId);
}
// TODO: use event to replace direct call
// Deleting a vertex will automatically deletes the held edge
graphTx.removeVertices(vertexLabel);
removeSchema(schemaTx, vertexLabel);
/*
* Should commit changes to backend store before release
* delete lock
*/
graph.graph().tx().commit();
// Remove vertex label
removeSchema(schemaTx, vertexLabel);
} catch (Throwable e) {
schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.UNDELETED);
schemaTx.updateSchemaStatus(vertexLabel,
SchemaStatus.UNDELETED);
throw e;
}
} finally {
Expand Down

0 comments on commit 60672b2

Please sign in to comment.