From 490b0cd206f514c9da25ff8bfcc0ab2cc2ad66c9 Mon Sep 17 00:00:00 2001 From: zhoney Date: Mon, 8 Apr 2019 11:45:29 +0800 Subject: [PATCH] Improve schema deletion by paging for large data (#417) implemented: #416 Change-Id: I61f11dfb6367acab9369b53af57c9114a11782c7 --- .../hugegraph/backend/page/IdHolderList.java | 2 + .../backend/page/PageEntryIterator.java | 16 ++-- .../hugegraph/backend/page/QueryList.java | 31 ++++--- .../baidu/hugegraph/backend/query/Query.java | 19 +++- .../backend/store/BackendEntryIterator.java | 24 +++++ .../backend/tx/GraphIndexTransaction.java | 2 +- .../backend/tx/GraphTransaction.java | 88 +++++++++++-------- .../baidu/hugegraph/config/CoreOptions.java | 4 +- .../store/scylladb/ScyllaDBTables.java | 20 ++--- 9 files changed, 139 insertions(+), 67 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/IdHolderList.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/IdHolderList.java index 98c8428233..8e6db384e8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/IdHolderList.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/IdHolderList.java @@ -26,6 +26,8 @@ public final class IdHolderList extends ArrayList { + private static final long serialVersionUID = -738694176552424990L; + private final boolean paging; public IdHolderList(boolean paging) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageEntryIterator.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageEntryIterator.java index 70b5532b28..e03a98b315 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageEntryIterator.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageEntryIterator.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; +import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.exception.NotSupportException; import com.baidu.hugegraph.iterator.Metadatable; @@ -44,7 +45,7 @@ public PageEntryIterator(QueryList queries, long pageSize) { } private PageState parsePageState() { - String page = this.queries.parent().page(); + String page = this.queries.parent().pageWithoutCheck(); PageState pageState = PageState.fromString(page); E.checkState(pageState.offset() < this.queries.total(), "Invalid page '%s' with an offset '%s' exceeds " + @@ -61,13 +62,15 @@ public boolean hasNext() { } private boolean fetch() { - if (this.remaining <= 0 || + if ((this.remaining != Query.NO_LIMIT && this.remaining <= 0L) || this.pageState.offset() >= this.queries.total()) { return false; } - long pageSize = this.remaining < this.pageSize ? - this.remaining : this.pageSize; + long pageSize = this.pageSize; + if (this.remaining != Query.NO_LIMIT && this.remaining < pageSize) { + pageSize = this.remaining; + } this.results = this.queries.fetchNext(this.pageState, pageSize); assert this.results != null; @@ -90,7 +93,10 @@ public BackendEntry next() { throw new NoSuchElementException(); } BackendEntry entry = this.results.iterator().next(); - this.remaining--; + if (this.remaining != Query.NO_LIMIT) { + // Assume one result in each entry (just for index query) + this.remaining--; + } return entry; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/QueryList.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/QueryList.java index 2aed96b807..ded4b09518 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/QueryList.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/QueryList.java @@ -90,7 +90,7 @@ public boolean empty() { } public Iterator fetch() { - assert !queries.isEmpty(); + assert !this.queries.isEmpty(); if (this.parent.paging()) { int pageSize = this.graph.configuration() .get(CoreOptions.QUERY_PAGE_SIZE); @@ -122,6 +122,7 @@ protected PageIterator fetchNext(PageState pageState, long pageSize) { return query.iterator(offset - current, pageState.page(), pageSize); } + @SuppressWarnings("unused") private static Set limit(Set ids, Query query) { long fromIndex = query.offset(); E.checkArgument(fromIndex <= Integer.MAX_VALUE, @@ -167,7 +168,7 @@ private interface QueryHolder { } /** - * Generate queries from index.optimizeQuery() + * Generate queries from tx.optimizeQuery() */ private class OptimizedQuery implements QueryHolder { @@ -183,9 +184,14 @@ public Iterator iterator() { public PageIterator iterator(int index, String page, long pageSize) { assert index == 0; - this.query.page(page); - Iterator iterator = fetcher.apply(this.query); - // Must iterate all entries before getting the next page info + Query query = this.query.copy(); + query.page(page); + // Not set limit to pageSize due to PageEntryIterator.remaining + if (this.query.limit() == Query.NO_LIMIT) { + query.limit(pageSize); + } + Iterator iterator = fetcher.apply(query); + // Must iterate all entries before get the next page List results = IteratorUtils.list(iterator); return new PageIterator(results.iterator(), PageState.page(iterator)); @@ -197,7 +203,7 @@ public int total() { } /** - * Generate queries from index.indexQuery() + * Generate queries from tx.indexQuery() */ private class IndexQuery implements QueryHolder { @@ -214,10 +220,15 @@ public Iterator iterator() { return null; } Set ids = holder.ids(); - if (ids.size() > parent.limit()) { - ids = limit(ids, parent); + if (parent.limit() != Query.NO_LIMIT && + ids.size() > parent.limit()) { + /* + * Avoid too many ids in one time query, + * Assume it will get one result by each id + */ + ids = CollectionUtil.subSet(ids, 0, (int) parent.limit()); } - IdQuery query = new IdQuery(parent.resultType(), ids); + IdQuery query = new IdQuery(parent, ids); return fetcher.apply(query); }); } @@ -228,7 +239,7 @@ public PageIterator iterator(int index, String page, long pageSize) { if (pageIds.empty()) { return PageIterator.EMPTY; } - IdQuery query = new IdQuery(parent.resultType(), pageIds.ids()); + IdQuery query = new IdQuery(parent, pageIds.ids()); return new PageIterator(fetcher.apply(query), pageIds.page()); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java index 406ccb5a31..bdd35bc6c8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import com.baidu.hugegraph.backend.BackendException; @@ -31,7 +32,6 @@ import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.util.E; -import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; public class Query implements Cloneable { @@ -48,6 +48,7 @@ public class Query implements Cloneable { private String page; private long capacity; private boolean showHidden; + private boolean showDeleting; private Query originQuery; @@ -70,6 +71,7 @@ public Query(HugeType resultType, Query originQuery) { this.capacity = defaultCapacity(); this.showHidden = false; + this.showDeleting = false; } public HugeType resultType() { @@ -174,6 +176,10 @@ public String page() { return this.page; } + public String pageWithoutCheck() { + return this.page; + } + public void page(String page) { this.page = page; } @@ -212,6 +218,14 @@ public void showHidden(boolean showHidden) { this.showHidden = showHidden; } + public boolean showDeleting() { + return this.showDeleting; + } + + public void showDeleting(boolean showDeleting) { + this.showDeleting = showDeleting; + } + public Set ids() { return ImmutableSet.of(); } @@ -246,8 +260,7 @@ public boolean equals(Object object) { this.orders.equals(other.orders) && this.offset == other.offset && this.limit == other.limit && - ((this.page == null && other.page == null) || - this.page.equals(other.page)) && + Objects.equals(this.page, other.page) && this.ids().equals(other.ids()) && this.conditions().equals(other.conditions()); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java index d179240050..38f46883fd 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java @@ -27,6 +27,7 @@ import com.baidu.hugegraph.exception.LimitExceedException; import com.baidu.hugegraph.exception.NotSupportException; import com.baidu.hugegraph.iterator.Metadatable; +import com.baidu.hugegraph.util.E; public abstract class BackendEntryIterator implements Iterator, AutoCloseable, Metadatable { @@ -38,6 +39,7 @@ public abstract class BackendEntryIterator private long count; public BackendEntryIterator(Query query) { + E.checkNotNull(query, "query"); this.query = query; this.count = 0L; this.current = null; @@ -150,4 +152,26 @@ protected long skip(BackendEntry entry, long skip) { protected abstract boolean fetch(); protected abstract String pageState(); + + public static final class EmptyIterator extends BackendEntryIterator { + + public EmptyIterator(Query query) { + super(query); + } + + @Override + protected boolean fetch() { + return false; + } + + @Override + protected String pageState() { + return null; + } + + @Override + public void close() throws Exception { + return; + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java index b7deb5837d..3ee581a73c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java @@ -285,7 +285,7 @@ private List queryByLabel(ConditionQuery query) { indexQuery.eq(HugeKeys.INDEX_LABEL_ID, il.id()); indexQuery.eq(HugeKeys.FIELD_VALUES, label); // Set offset and limit to avoid redundant element ids - indexQuery.page(query.page()); + indexQuery.page(query.pageWithoutCheck()); indexQuery.limit(query.limit()); indexQuery.offset(query.offset()); indexQuery.capacity(query.capacity()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java index c3b178379b..be78485b76 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java @@ -46,6 +46,7 @@ import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.SplicingIdGenerator; import com.baidu.hugegraph.backend.page.IdHolder; +import com.baidu.hugegraph.backend.page.PageState; import com.baidu.hugegraph.backend.page.QueryList; import com.baidu.hugegraph.backend.query.Condition; import com.baidu.hugegraph.backend.query.ConditionQuery; @@ -63,6 +64,7 @@ import com.baidu.hugegraph.iterator.FilterIterator; import com.baidu.hugegraph.iterator.FlatMapperIterator; import com.baidu.hugegraph.iterator.MapperIterator; +import com.baidu.hugegraph.iterator.Metadatable; import com.baidu.hugegraph.perf.PerfUtil.Watched; import com.baidu.hugegraph.schema.EdgeLabel; import com.baidu.hugegraph.schema.IndexLabel; @@ -80,6 +82,7 @@ import com.baidu.hugegraph.type.define.Directions; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.type.define.IdStrategy; +import com.baidu.hugegraph.type.define.SchemaStatus; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.LockUtil; @@ -88,6 +91,7 @@ public class GraphTransaction extends IndexableTransaction { public static final int COMMIT_BATCH = 500; + private static final long TRAVERSE_BATCH = 100_000L; private final GraphIndexTransaction indexTx; @@ -480,6 +484,11 @@ public Iterator queryVertices(Query query) { if (!query.showHidden() && Graph.Hidden.isHidden(vertex.label())) { return false; } + // Filter vertices of deleting vertex label + if (vertex.schemaLabel().status() == SchemaStatus.DELETING && + !query.showDeleting()) { + return false; + } // Process results that query from left index or primary-key if (query.resultType().isVertex() && !filterResultFromIndexQuery(query, vertex)) { @@ -606,6 +615,11 @@ public Iterator queryEdges(Query query) { if (!query.showHidden() && Graph.Hidden.isHidden(edge.label())) { return false; } + // Filter edges of deleting edge label + if (edge.schemaLabel().status() == SchemaStatus.DELETING && + !query.showDeleting()) { + return false; + } // Process results that query from left index if (!this.filterResultFromIndexQuery(query, edge)) { return false; @@ -1398,59 +1412,59 @@ public void removeEdges(EdgeLabel edgeLabel) { public void traverseVerticesByLabel(VertexLabel label, Consumer consumer, - boolean remove) { - this.traverseByLabel(label, this::queryVertices, consumer, remove); + boolean deleting) { + this.traverseByLabel(label, this::queryVertices, consumer, deleting); } public void traverseEdgesByLabel(EdgeLabel label, Consumer consumer, - boolean remove) { - this.traverseByLabel(label, this::queryEdges, consumer, remove); + boolean deleting) { + this.traverseByLabel(label, this::queryEdges, consumer, deleting); } private void traverseByLabel(SchemaLabel label, Function> fetcher, - Consumer consumer, boolean remove) { + Consumer consumer, boolean deleting) { HugeType type = label.type() == HugeType.VERTEX_LABEL ? HugeType.VERTEX : HugeType.EDGE; - ConditionQuery query = new ConditionQuery(type); - // Whether query system vertices + Query query = label.enableLabelIndex() ? + new ConditionQuery(type) : + new Query(type); + query.capacity(Query.NO_CAPACITY); + query.limit(Query.NO_LIMIT); + if (this.store().features().supportsQueryByPage()) { + query.page(PageState.PAGE_NONE); + } if (label.hidden()) { query.showHidden(true); } + query.showDeleting(deleting); - // Not support label index, query all and filter by label - if (!label.enableLabelIndex()) { - query.capacity(Query.NO_CAPACITY); + if (label.enableLabelIndex()) { + // Support label index, query by label index + ((ConditionQuery) query).eq(HugeKeys.LABEL, label.id()); Iterator itor = fetcher.apply(query); while (itor.hasNext()) { - T e = itor.next(); - SchemaLabel elemLabel = ((HugeElement) e).schemaLabel(); - if (label.equals(elemLabel)) { - consumer.accept(e); - } - } - return; - } - - /* - * Support label index, query by label. Set limit&capacity to - * Query.DEFAULT_CAPACITY to limit elements number per pass - */ - query.limit(Query.DEFAULT_CAPACITY); - query.capacity(Query.NO_CAPACITY); - query.eq(HugeKeys.LABEL, label.id()); - int pass = 0; - int counter; - do { - if (!remove) { - query.offset(pass++ * Query.DEFAULT_CAPACITY); - } - // Process every element in current batch - Iterator itor = fetcher.apply(query); - for (counter = 0; itor.hasNext(); ++counter) { consumer.accept(itor.next()); } - assert counter <= Query.DEFAULT_CAPACITY; - } while (counter == Query.DEFAULT_CAPACITY); // If not, means finish + } else { + // Not support label index, query all and filter by label + if (query.paging()) { + query.limit(TRAVERSE_BATCH); + } + String page = null; + do { + Iterator itor = fetcher.apply(query); + while (itor.hasNext()) { + T e = itor.next(); + SchemaLabel elemLabel = ((HugeElement) e).schemaLabel(); + if (label.equals(elemLabel)) { + consumer.accept(e); + } + } + if (query.paging()) { + page = PageState.page(itor); + } + } while (page != null); + } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java index b2cece77e8..8623df8bb6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java @@ -19,6 +19,8 @@ package com.baidu.hugegraph.config; +import com.baidu.hugegraph.backend.query.Query; + import static com.baidu.hugegraph.backend.tx.GraphTransaction.COMMIT_BATCH; import static com.baidu.hugegraph.config.OptionChecker.disallowEmpty; import static com.baidu.hugegraph.config.OptionChecker.rangeInt; @@ -211,7 +213,7 @@ public static synchronized CoreOptions instance() { new ConfigOption<>( "query.page_size", "The size of each page when query using paging.", - rangeInt(0, 10000), + rangeInt(1, (int) Query.DEFAULT_CAPACITY), 500 ); diff --git a/hugegraph-scylladb/src/main/java/com/baidu/hugegraph/backend/store/scylladb/ScyllaDBTables.java b/hugegraph-scylladb/src/main/java/com/baidu/hugegraph/backend/store/scylladb/ScyllaDBTables.java index 4e1305cb9d..a4094369c5 100644 --- a/hugegraph-scylladb/src/main/java/com/baidu/hugegraph/backend/store/scylladb/ScyllaDBTables.java +++ b/hugegraph-scylladb/src/main/java/com/baidu/hugegraph/backend/store/scylladb/ScyllaDBTables.java @@ -33,6 +33,7 @@ import com.baidu.hugegraph.backend.query.ConditionQuery; import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.store.BackendEntry; +import com.baidu.hugegraph.backend.store.BackendEntryIterator; import com.baidu.hugegraph.backend.store.cassandra.CassandraBackendEntry; import com.baidu.hugegraph.backend.store.cassandra.CassandraSessionPool; import com.baidu.hugegraph.backend.store.cassandra.CassandraTable; @@ -123,7 +124,7 @@ private static Query queryByLabelIndex( } ConditionQuery q = (ConditionQuery) query; - Id label = (Id) q.condition(HugeKeys.LABEL); + Id label = q.condition(HugeKeys.LABEL); if (label != null && q.allSysprop() && conditions.size() == 1 && q.containsCondition(HugeKeys.LABEL, Condition.RelationType.EQ)) { @@ -211,11 +212,11 @@ public void delete(CassandraSessionPool.Session session, public Iterator query( CassandraSessionPool.Session session, Query query) { - query = queryByLabelIndex(session, indexTable(), query); - if (query == null) { - return ImmutableList.of().iterator(); + Query idQuery = queryByLabelIndex(session, indexTable(), query); + if (idQuery == null) { + return new BackendEntryIterator.EmptyIterator(query); } - return super.query(session, query); + return super.query(session, idQuery); } } @@ -317,12 +318,11 @@ protected void deleteEdgesByLabel(CassandraSessionPool.Session session, @Override public Iterator query( CassandraSessionPool.Session session, Query query) { - query = queryByLabelIndex(session, indexTable(), query); - - if (query == null) { - return ImmutableList.of().iterator(); + Query idQuery = queryByLabelIndex(session, indexTable(), query); + if (idQuery == null) { + return new BackendEntryIterator.EmptyIterator(query); } - return super.query(session, query); + return super.query(session, idQuery); } public static Edge out(String store) {