Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance schema job module #1729

Merged
merged 2 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.exception.NotAllowException;
import com.baidu.hugegraph.job.JobBuilder;
import com.baidu.hugegraph.job.schema.EdgeLabelRemoveCallable;
import com.baidu.hugegraph.job.schema.IndexLabelRemoveCallable;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyClearCallable;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyCreateCallable;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyRemoveCallable;
import com.baidu.hugegraph.job.schema.RebuildIndexCallable;
import com.baidu.hugegraph.job.schema.SchemaCallable;
import com.baidu.hugegraph.job.schema.VertexLabelRemoveCallable;
import com.baidu.hugegraph.job.schema.EdgeLabelRemoveJob;
import com.baidu.hugegraph.job.schema.IndexLabelRebuildJob;
import com.baidu.hugegraph.job.schema.IndexLabelRemoveJob;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyClearJob;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyCreateJob;
import com.baidu.hugegraph.job.schema.OlapPropertyKeyRemoveJob;
import com.baidu.hugegraph.job.schema.SchemaJob;
import com.baidu.hugegraph.job.schema.VertexLabelRemoveJob;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
Expand Down Expand Up @@ -200,7 +200,7 @@ public VertexLabel getVertexLabel(String name) {
@Watched(prefix = "schema")
public Id removeVertexLabel(Id id) {
LOG.debug("SchemaTransaction remove vertex label '{}'", id);
SchemaCallable callable = new VertexLabelRemoveCallable();
SchemaJob callable = new VertexLabelRemoveJob();
VertexLabel schema = this.getVertexLabel(id);
return asyncRun(this.graph(), schema, callable);
}
Expand All @@ -226,7 +226,7 @@ public EdgeLabel getEdgeLabel(String name) {
@Watched(prefix = "schema")
public Id removeEdgeLabel(Id id) {
LOG.debug("SchemaTransaction remove edge label '{}'", id);
SchemaCallable callable = new EdgeLabelRemoveCallable();
SchemaJob callable = new EdgeLabelRemoveJob();
EdgeLabel schema = this.getEdgeLabel(id);
return asyncRun(this.graph(), schema, callable);
}
Expand Down Expand Up @@ -262,7 +262,7 @@ public IndexLabel getIndexLabel(String name) {
@Watched(prefix = "schema")
public Id removeIndexLabel(Id id) {
LOG.debug("SchemaTransaction remove index label '{}'", id);
SchemaCallable callable = new IndexLabelRemoveCallable();
SchemaJob callable = new IndexLabelRemoveJob();
IndexLabel schema = this.getIndexLabel(id);
return asyncRun(this.graph(), schema, callable);
}
Expand All @@ -276,7 +276,7 @@ public Id rebuildIndex(SchemaElement schema) {
public Id rebuildIndex(SchemaElement schema, Set<Id> dependencies) {
LOG.debug("SchemaTransaction rebuild index for {} with id '{}'",
schema.type(), schema.id());
SchemaCallable callable = new RebuildIndexCallable();
SchemaJob callable = new IndexLabelRebuildJob();
return asyncRun(this.graph(), schema, callable, dependencies);
}

Expand Down Expand Up @@ -305,21 +305,21 @@ public void createIndexLabelForOlapPk(PropertyKey propertyKey) {
public Id createOlapPk(PropertyKey propertyKey) {
LOG.debug("SchemaTransaction create olap property key {} with id '{}'",
propertyKey.name(), propertyKey.id());
SchemaCallable callable = new OlapPropertyKeyCreateCallable();
SchemaJob callable = new OlapPropertyKeyCreateJob();
return asyncRun(this.graph(), propertyKey, callable);
}

public Id clearOlapPk(PropertyKey propertyKey) {
LOG.debug("SchemaTransaction clear olap property key {} with id '{}'",
propertyKey.name(), propertyKey.id());
SchemaCallable callable = new OlapPropertyKeyClearCallable();
SchemaJob callable = new OlapPropertyKeyClearJob();
return asyncRun(this.graph(), propertyKey, callable);
}

public Id removeOlapPk(PropertyKey propertyKey) {
LOG.debug("SchemaTransaction remove olap property key {} with id '{}'",
propertyKey.name(), propertyKey.id());
SchemaCallable callable = new OlapPropertyKeyRemoveCallable();
SchemaJob callable = new OlapPropertyKeyRemoveJob();
return asyncRun(this.graph(), propertyKey, callable);
}

Expand Down Expand Up @@ -543,17 +543,17 @@ private static void setCreateTimeIfNeeded(SchemaElement schema) {
}

private static Id asyncRun(HugeGraph graph, SchemaElement schema,
SchemaCallable callable) {
SchemaJob callable) {
return asyncRun(graph, schema, callable, ImmutableSet.of());
}

@Watched(prefix = "schema")
private static Id asyncRun(HugeGraph graph, SchemaElement schema,
SchemaCallable callable, Set<Id> dependencies) {
SchemaJob callable, Set<Id> dependencies) {
E.checkArgument(schema != null, "Schema can't be null");
String name = SchemaCallable.formatTaskName(schema.type(),
schema.id(),
schema.name());
String name = SchemaJob.formatTaskName(schema.type(),
schema.id(),
schema.name());

JobBuilder<Object> builder = JobBuilder.of(graph).name(name)
.job(callable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.baidu.hugegraph.util.LockUtil;
import com.google.common.collect.ImmutableSet;

public class EdgeLabelRemoveCallable extends SchemaCallable {
public class EdgeLabelRemoveJob extends SchemaJob {

@Override
public String type() {
Expand All @@ -43,15 +43,19 @@ public Object execute() {
return null;
}

protected static void removeEdgeLabel(HugeGraphParams graph, Id id) {
private static void removeEdgeLabel(HugeGraphParams graph, Id id) {
GraphTransaction graphTx = graph.graphTransaction();
SchemaTransaction schemaTx = graph.schemaTransaction();
EdgeLabel edgeLabel = schemaTx.getEdgeLabel(id);
// If the edge label does not exist, return directly
if (edgeLabel == null) {
return;
}
// TODO: use event to replace direct call
if (edgeLabel.status().deleting()) {
LOG.info("The edge label '{}' has been in {} status, " +
"please check if it's expected to delete it again",
edgeLabel, edgeLabel.status());
}
// Remove index related data(include schema) of this edge label
Set<Id> indexIds = ImmutableSet.copyOf(edgeLabel.indexLabels());
LockUtil.Locks locks = new LockUtil.Locks(graph.name());
Expand All @@ -60,16 +64,18 @@ protected static void removeEdgeLabel(HugeGraphParams graph, Id id) {
schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.DELETING);
try {
for (Id indexId : indexIds) {
IndexLabelRemoveCallable.removeIndexLabel(graph, indexId);
IndexLabelRemoveJob.removeIndexLabel(graph, indexId);
}
// Remove all edges which has matched label
// TODO: use event to replace direct call
graphTx.removeEdges(edgeLabel);
removeSchema(schemaTx, edgeLabel);
/*
* Should commit changes to backend store before release
* delete lock
*/
graph.graph().tx().commit();
// Remove edge label
removeSchema(schemaTx, edgeLabel);
} catch (Throwable e) {
schemaTx.updateSchemaStatus(edgeLabel, SchemaStatus.UNDELETED);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import com.baidu.hugegraph.util.LockUtil;
import com.google.common.collect.ImmutableSet;

public class RebuildIndexCallable extends SchemaCallable {
public class IndexLabelRebuildJob extends SchemaJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.baidu.hugegraph.type.define.SchemaStatus;
import com.baidu.hugegraph.util.LockUtil;

public class IndexLabelRemoveCallable extends SchemaCallable {
public class IndexLabelRemoveJob extends SchemaJob {

@Override
public String type() {
Expand All @@ -48,6 +48,11 @@ protected static void removeIndexLabel(HugeGraphParams graph, Id id) {
if (indexLabel == null) {
return;
}
if (indexLabel.status().deleting()) {
LOG.info("The index label '{}' has been in {} status, " +
"please check if it's expected to delete it again",
indexLabel, indexLabel.status());
}
LockUtil.Locks locks = new LockUtil.Locks(graph.name());
try {
locks.lockWrites(LockUtil.INDEX_LABEL_DELETE, id);
Expand All @@ -60,14 +65,15 @@ protected static void removeIndexLabel(HugeGraphParams graph, Id id) {
// Remove index data
// TODO: use event to replace direct call
graphTx.removeIndex(indexLabel);
removeSchema(schemaTx, indexLabel);
/*
* Should commit changes to backend store before release
* delete lock
*/
graph.graph().tx().commit();
// Remove index label
removeSchema(schemaTx, indexLabel);
} catch (Throwable e) {
schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.INVALID);
schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.UNDELETED);
throw e;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.baidu.hugegraph.type.define.SchemaStatus;
import com.baidu.hugegraph.util.LockUtil;

public class OlapPropertyKeyClearCallable extends IndexLabelRemoveCallable {
public class OlapPropertyKeyClearJob extends IndexLabelRemoveJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
import com.baidu.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyCreateCallable extends SchemaCallable {
public class OlapPropertyKeyCreateJob extends SchemaJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
import com.baidu.hugegraph.schema.PropertyKey;

public class OlapPropertyKeyRemoveCallable
extends OlapPropertyKeyClearCallable {
public class OlapPropertyKeyRemoveJob extends OlapPropertyKeyClearJob {

@Override
public String type() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import org.slf4j.Logger;

import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
Expand All @@ -13,8 +15,9 @@
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;

public abstract class SchemaCallable extends SysJob<Object> {
public abstract class SchemaJob extends SysJob<Object> {

public static final String REMOVE_SCHEMA = "remove_schema";
public static final String REBUILD_INDEX = "rebuild_index";
Expand All @@ -23,6 +26,8 @@ public abstract class SchemaCallable extends SysJob<Object> {
public static final String CLEAR_OLAP = "clear_olap";
public static final String REMOVE_OLAP = "remove_olap";

protected static final Logger LOG = Log.logger(SchemaJob.class);

private static final String SPLITOR = ":";

protected HugeType schemaType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import com.baidu.hugegraph.util.LockUtil;
import com.google.common.collect.ImmutableSet;

public class VertexLabelRemoveCallable extends SchemaCallable {
public class VertexLabelRemoveJob extends SchemaJob {

@Override
public String type() {
Expand All @@ -54,7 +54,13 @@ private static void removeVertexLabel(HugeGraphParams graph, Id id) {
if (vertexLabel == null) {
return;
}
if (vertexLabel.status().deleting()) {
LOG.info("The vertex label '{}' has been in {} status, " +
"please check if it's expected to delete it again",
vertexLabel, vertexLabel.status());
}

// Check no edge label use the vertex label
List<EdgeLabel> edgeLabels = schemaTx.getEdgeLabels();
for (EdgeLabel edgeLabel : edgeLabels) {
if (edgeLabel.linkWithLabel(id)) {
Expand All @@ -76,19 +82,21 @@ private static void removeVertexLabel(HugeGraphParams graph, Id id) {
schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.DELETING);
try {
for (Id ilId : indexLabelIds) {
IndexLabelRemoveCallable.removeIndexLabel(graph, ilId);
IndexLabelRemoveJob.removeIndexLabel(graph, ilId);
}
// TODO: use event to replace direct call
// Deleting a vertex will automatically deletes the held edge
graphTx.removeVertices(vertexLabel);
removeSchema(schemaTx, vertexLabel);
/*
* Should commit changes to backend store before release
* delete lock
*/
graph.graph().tx().commit();
// Remove vertex label
removeSchema(schemaTx, vertexLabel);
} catch (Throwable e) {
schemaTx.updateSchemaStatus(vertexLabel, SchemaStatus.UNDELETED);
schemaTx.updateSchemaStatus(vertexLabel,
SchemaStatus.UNDELETED);
throw e;
}
} finally {
Expand Down