diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java index cedf72c945..e3b1c9f443 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java @@ -558,12 +558,18 @@ public void delete(CassandraSessionPool.Session session, } final String FIELD_VALUES = formatKey(HugeKeys.FIELD_VALUES); + int count = 0; for (Iterator it = rs.iterator(); it.hasNext();) { fieldValues = it.next().get(FIELD_VALUES, String.class); Delete delete = QueryBuilder.delete().from(this.table()); delete.where(formatEQ(HugeKeys.INDEX_LABEL_ID, indexLabel)); delete.where(formatEQ(HugeKeys.FIELD_VALUES, fieldValues)); session.add(delete); + + if (++count >= COMMIT_DELETE_BATCH) { + session.commit(); + count = 0; + } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java index 8120fa5e33..fbe14beb9f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java @@ -1365,7 +1365,7 @@ public void removeVertices(VertexLabel vertexLabel) { this.traverseVerticesByLabel(vertexLabel, vertex -> { this.removeVertex((HugeVertex) vertex); this.commitIfGtSize(COMMIT_BATCH); - }); + }, true); this.commit(); } catch (Exception e) { LOG.error("Failed to remove vertices", e); @@ -1395,7 +1395,7 @@ public void removeEdges(EdgeLabel edgeLabel) { this.traverseEdgesByLabel(edgeLabel, edge -> { this.removeEdge((HugeEdge) edge); this.commitIfGtSize(COMMIT_BATCH); - }); + }, true); } this.commit(); } catch (Exception e) { @@ -1407,18 +1407,19 @@ public void removeEdges(EdgeLabel edgeLabel) { } public void traverseVerticesByLabel(VertexLabel label, - Consumer consumer) { - this.traverseByLabel(label, this::queryVertices, consumer); + Consumer consumer, + boolean remove) { + this.traverseByLabel(label, this::queryVertices, consumer, remove); } - public void traverseEdgesByLabel(EdgeLabel label, - Consumer consumer) { - this.traverseByLabel(label, this::queryEdges, consumer); + public void traverseEdgesByLabel(EdgeLabel label, Consumer consumer, + boolean remove) { + this.traverseByLabel(label, this::queryEdges, consumer, remove); } private void traverseByLabel(SchemaLabel label, Function> fetcher, - Consumer consumer) { + Consumer consumer, boolean remove) { HugeType type = label.type() == HugeType.VERTEX_LABEL ? HugeType.VERTEX : HugeType.EDGE; ConditionQuery query = new ConditionQuery(type); @@ -1449,9 +1450,11 @@ private void traverseByLabel(SchemaLabel label, query.capacity(Query.NO_CAPACITY); query.eq(HugeKeys.LABEL, label.id()); int pass = 0; - int counter = 0; + int counter; do { - query.offset(pass++ * Query.DEFAULT_CAPACITY); + if (!remove) { + query.offset(pass++ * Query.DEFAULT_CAPACITY); + } // Process every element in current batch Iterator itor = fetcher.apply(query); for (counter = 0; itor.hasNext(); ++counter) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java index 4431d8b974..1fbb49d00c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java @@ -123,12 +123,14 @@ private void rebuildIndex(SchemaLabel label, Collection indexLabelIds) { if (label.type() == HugeType.VERTEX_LABEL) { @SuppressWarnings("unchecked") Consumer consumer = (Consumer) indexUpdater; - graphTx.traverseVerticesByLabel((VertexLabel) label, consumer); + graphTx.traverseVerticesByLabel((VertexLabel) label, + consumer, false); } else { assert label.type() == HugeType.EDGE_LABEL; @SuppressWarnings("unchecked") Consumer consumer = (Consumer) indexUpdater; - graphTx.traverseEdgesByLabel((EdgeLabel) label, consumer); + graphTx.traverseEdgesByLabel((EdgeLabel) label, + consumer, false); } graphTx.commit();