Skip to content

Commit

Permalink
fix bug: remove secondary index of cassandra backend exceeding 65535 …
Browse files Browse the repository at this point in the history
…limit

fixed: #384

Change-Id: I3b0db265b621c149404a97ab688af8bd2e4ca4bd
  • Loading branch information
zhoney committed Mar 8, 2019
1 parent 6eaeee8 commit 69c53b7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,18 @@ public void delete(CassandraSessionPool.Session session,
}

final String FIELD_VALUES = formatKey(HugeKeys.FIELD_VALUES);
int count = 0;
for (Iterator<Row> it = rs.iterator(); it.hasNext();) {
fieldValues = it.next().get(FIELD_VALUES, String.class);
Delete delete = QueryBuilder.delete().from(this.table());
delete.where(formatEQ(HugeKeys.INDEX_LABEL_ID, indexLabel));
delete.where(formatEQ(HugeKeys.FIELD_VALUES, fieldValues));
session.add(delete);

if (++count >= COMMIT_DELETE_BATCH) {
session.commit();
count = 0;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ public void removeVertices(VertexLabel vertexLabel) {
this.traverseVerticesByLabel(vertexLabel, vertex -> {
this.removeVertex((HugeVertex) vertex);
this.commitIfGtSize(COMMIT_BATCH);
});
}, true);
this.commit();
} catch (Exception e) {
LOG.error("Failed to remove vertices", e);
Expand Down Expand Up @@ -1354,7 +1354,7 @@ public void removeEdges(EdgeLabel edgeLabel) {
this.traverseEdgesByLabel(edgeLabel, edge -> {
this.removeEdge((HugeEdge) edge);
this.commitIfGtSize(COMMIT_BATCH);
});
}, true);
}
this.commit();
} catch (Exception e) {
Expand All @@ -1366,18 +1366,19 @@ public void removeEdges(EdgeLabel edgeLabel) {
}

public void traverseVerticesByLabel(VertexLabel label,
Consumer<Vertex> consumer) {
this.traverseByLabel(label, this::queryVertices, consumer);
Consumer<Vertex> consumer,
boolean remove) {
this.traverseByLabel(label, this::queryVertices, consumer, remove);
}

public void traverseEdgesByLabel(EdgeLabel label,
Consumer<Edge> consumer) {
this.traverseByLabel(label, this::queryEdges, consumer);
public void traverseEdgesByLabel(EdgeLabel label, Consumer<Edge> consumer,
boolean remove) {
this.traverseByLabel(label, this::queryEdges, consumer, remove);
}

private <T> void traverseByLabel(SchemaLabel label,
Function<Query, Iterator<T>> fetcher,
Consumer<T> consumer) {
Consumer<T> consumer, boolean remove) {
HugeType type = label.type() == HugeType.VERTEX_LABEL ?
HugeType.VERTEX : HugeType.EDGE;
ConditionQuery query = new ConditionQuery(type);
Expand Down Expand Up @@ -1408,9 +1409,11 @@ private <T> void traverseByLabel(SchemaLabel label,
query.capacity(Query.NO_CAPACITY);
query.eq(HugeKeys.LABEL, label.id());
int pass = 0;
int counter = 0;
int counter;
do {
query.offset(pass++ * Query.DEFAULT_CAPACITY);
if (!remove) {
query.offset(pass++ * Query.DEFAULT_CAPACITY);
}
// Process every element in current batch
Iterator<T> itor = fetcher.apply(query);
for (counter = 0; itor.hasNext(); ++counter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ private void rebuildIndex(SchemaLabel label, Collection<Id> indexLabelIds) {
if (label.type() == HugeType.VERTEX_LABEL) {
@SuppressWarnings("unchecked")
Consumer<Vertex> consumer = (Consumer<Vertex>) indexUpdater;
graphTx.traverseVerticesByLabel((VertexLabel) label, consumer);
graphTx.traverseVerticesByLabel((VertexLabel) label,
consumer, false);
} else {
assert label.type() == HugeType.EDGE_LABEL;
@SuppressWarnings("unchecked")
Consumer<Edge> consumer = (Consumer<Edge>) indexUpdater;
graphTx.traverseEdgesByLabel((EdgeLabel) label, consumer);
graphTx.traverseEdgesByLabel((EdgeLabel) label,
consumer, false);
}
graphTx.commit();

Expand Down

0 comments on commit 69c53b7

Please sign in to comment.