Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit index updates in batch #150

Merged
merged 1 commit into from
Nov 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public Map<String, Id> post(@Context GraphManager manager,

HugeGraph g = graph(manager, graph);
request.aliase(graph, "graph");
JobBuilder builder = JobBuilder.of(g).name(request.name())
.input(request.toJson())
.job(new GremlinJob());
return ImmutableMap.of("task_id", builder.schedule());
JobBuilder<Object> builder = JobBuilder.of(g);
builder.name(request.name())
.input(request.toJson())
.job(new GremlinJob());
return ImmutableMap.of("task_id", builder.schedule().id());
}

public static class GremlinJob extends Job<Object> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ public boolean closed() {
return this.refs.get() == 0;
}

public void commitIfGtSize(int size) {
// Only committing graph transaction data if reaching batch size
// is OK, bacause schema transaction is auto committed.
this.graphTransaction().commitIfGtSize(size);
}

@Override
public void commit() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface Transaction {

public void commit() throws BackendException;

public void commitIfGtSize(int size) throws BackendException;

public void rollback() throws BackendException;

public boolean autoCommit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,9 @@ public Cache cache(String name) {
}

public Cache cache(String name, int capacity) {
Cache cache = this.caches.get(name);
if (cache == null) {
cache = new RamCache(capacity);
this.caches.put(name, cache);
if (!this.caches.containsKey(name)) {
this.caches.putIfAbsent(name, new RamCache(capacity));
}
return cache;
return this.caches.get(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ public void commit() throws BackendException {
}
}

public void commitIfGtSize(int size) throws BackendException {
if (this.mutationSize() >= size) {
this.commit();
}
}

@Watched(prefix = "tx")
@Override
public void rollback() throws BackendException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class GraphIndexTransaction extends AbstractTransaction {

private static final String INDEX_EMPTY_SYM = "\u0000";
private static final Query EMPTY_QUERY = new ConditionQuery(null);
private static final int REBUILD_COMMIT_BATCH = 1000;

private final Analyzer textAnalyzer;

Expand Down Expand Up @@ -1100,30 +1101,13 @@ public void rebuildIndex(HugeType type, Id label,
* They have different id lead to it can't compare and optimize
*/
this.commit();

if (type == HugeType.VERTEX_LABEL) {
ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
query.eq(HugeKeys.LABEL, label);
query.capacity(Query.NO_CAPACITY);
for (Iterator<Vertex> itor = graphTx.queryVertices(query);
itor.hasNext();) {
HugeVertex vertex = (HugeVertex) itor.next();
for (Id id : indexLabelIds) {
this.updateIndex(id, vertex, false);
}
}
this.rebuildIndex(HugeType.VERTEX, label, indexLabelIds,
graphTx::queryVerticesFromBackend);
} else {
assert type == HugeType.EDGE_LABEL;
ConditionQuery query = new ConditionQuery(HugeType.EDGE);
query.eq(HugeKeys.LABEL, label);
query.capacity(Query.NO_CAPACITY);
for (Iterator<Edge> itor = graphTx.queryEdges(query);
itor.hasNext();) {
HugeEdge edge = (HugeEdge) itor.next();
for (Id id : indexLabelIds) {
this.updateIndex(id, edge, false);
}
}
this.rebuildIndex(HugeType.EDGE, label, indexLabelIds,
graphTx::queryEdgesFromBackend);
}
this.commit();

Expand All @@ -1136,6 +1120,34 @@ public void rebuildIndex(HugeType type, Id label,
}
}

private <T> void rebuildIndex(HugeType type, Id label,
Collection<Id> indexLabelIds,
Function<Query, Iterator<T>> fetcher) {
assert type == HugeType.VERTEX || type == HugeType.EDGE;
ConditionQuery query = new ConditionQuery(type);
query.eq(HugeKeys.LABEL, label);
query.limit(Query.DEFAULT_CAPACITY);

int pass = 0;
int counter;
do {
counter = 0;
query.offset(pass++ * Query.DEFAULT_CAPACITY);
Iterator<T> itor = fetcher.apply(query);
while (itor.hasNext()) {
HugeElement element = (HugeElement) itor.next();
for (Id id : indexLabelIds) {
this.updateIndex(id, element, false);
// Commit per small batch to avoid too much data
// in single commit, especially for Cassandra backend.
this.commitIfGtSize(REBUILD_COMMIT_BATCH);
}
++counter;
}
assert counter <= Query.DEFAULT_CAPACITY;
} while (counter == Query.DEFAULT_CAPACITY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert counter <= Query.DEFAULT_CAPACITY

}

private static class MatchedLabel {

private SchemaLabel schemaLabel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public Iterator<Vertex> queryVertices(Query query) {
return r;
}

private Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
protected Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
assert query.resultType().isVertex();

Iterator<BackendEntry> entries = this.query(query);
Expand Down Expand Up @@ -562,7 +562,7 @@ public Iterator<Edge> queryEdges(Query query) {
return r;
}

private Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
assert query.resultType().isEdge();

Iterator<BackendEntry> entries = this.query(query);
Expand Down
Loading