Skip to content

Commit

Permalink
mprove schema deletion by paging for large data
Browse files Browse the repository at this point in the history
implemented: #416

Change-Id: I61f11dfb6367acab9369b53af57c9114a11782c7
  • Loading branch information
zhoney committed Mar 29, 2019
1 parent 60d8d8f commit ea35cb1
Showing 1 changed file with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import com.baidu.hugegraph.iterator.FilterIterator;
import com.baidu.hugegraph.iterator.FlatMapperIterator;
import com.baidu.hugegraph.iterator.MapperIterator;
import com.baidu.hugegraph.iterator.Metadatable;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
Expand All @@ -88,6 +89,7 @@
public class GraphTransaction extends IndexableTransaction {

public static final int COMMIT_BATCH = 500;
private static final long TRAVERSE_BATCH = 100_000L;

private final GraphIndexTransaction indexTx;

Expand Down Expand Up @@ -1410,6 +1412,20 @@ public void traverseEdgesByLabel(EdgeLabel label, Consumer<Edge> consumer,
private <T> void traverseByLabel(SchemaLabel label,
Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer, boolean remove) {
if (!this.store().features().supportsQueryByPage()) {
this.traverseByLabelWithoutPage(label, fetcher, consumer, remove);
return;
}
if (!label.enableLabelIndex()) {
this.traverseByLabelWithoutIndexInPage(label, fetcher, consumer);
} else {
this.traverseByLabelWithIndexInPage(label, fetcher, consumer);
}
}

private <T> void traverseByLabelWithoutPage(
SchemaLabel label, Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer, boolean remove) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
ConditionQuery query = new ConditionQuery(type);
Expand Down Expand Up @@ -1453,4 +1469,58 @@ private <T> void traverseByLabel(SchemaLabel label,
assert counter <= Query.DEFAULT_CAPACITY;
} while (counter == Query.DEFAULT_CAPACITY); // If not, means finish
}

private <T> void traverseByLabelWithIndexInPage(
SchemaLabel label, Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
ConditionQuery query = new ConditionQuery(type);
query.eq(HugeKeys.LABEL, label.id());
query.limit(TRAVERSE_BATCH);
// Whether query system vertices
if (label.hidden()) {
query.showHidden(true);
}

Iterator<T> itor;
String page = "";
while (page != null) {
query.page(page);
itor = fetcher.apply(query);
while (itor.hasNext()) {
consumer.accept(itor.next());
}
page = (String) ((Metadatable) itor).metadata("page");
}
}

private <T> void traverseByLabelWithoutIndexInPage(
SchemaLabel label, Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
Query query = new Query(type);
query.limit(TRAVERSE_BATCH);
// Whether query system vertices
if (label.hidden()) {
query.showHidden(true);
}

Iterator<T> itor;
// Not support label index, query all and filter by label
String page = "";
while (page != null) {
query.page(page);
itor = fetcher.apply(query);
while (itor.hasNext()) {
T e = itor.next();
SchemaLabel elemLabel = ((HugeElement) e).schemaLabel();
if (label.equals(elemLabel)) {
consumer.accept(e);
}
}
page = (String) ((Metadatable) itor).metadata("page");
}
}
}

0 comments on commit ea35cb1

Please sign in to comment.