From db3ee80a38619e3568e334800b4b80139b1b1cad Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 11 Dec 2024 17:13:10 +0800 Subject: [PATCH] cache use on-heap id && adopt for disabled mode --- .../hugegraph/api/traversers/KoutAPI.java | 16 ++++++----- .../backend/cache/CachedGraphTransaction.java | 4 +++ .../cache/CachedSchemaTransaction.java | 13 +++++++-- .../cache/CachedSchemaTransactionV2.java | 19 +++++++++---- .../apache/hugegraph/config/CoreOptions.java | 2 +- .../hugegraph/memory/MemoryManager.java | 6 ++++- .../hugegraph/schema/SchemaElement.java | 9 ++++++- .../apache/hugegraph/structure/HugeEdge.java | 7 +++++ .../hugegraph/structure/HugeVertex.java | 8 ++++++ .../traversal/algorithm/HugeTraverser.java | 2 +- .../org/apache/hugegraph/util/Consumers.java | 27 ++++++++++++------- 11 files changed, 86 insertions(+), 27 deletions(-) diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java index e226e5eb00..65d59d8f8d 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.hugegraph.HugeGraph; @@ -97,11 +98,13 @@ public String get(@Context GraphManager manager, graph, source, direction, edgeLabel, depth, nearest, maxDegree, capacity, limit); MemoryPool queryPool = MemoryManager.getInstance().addQueryMemoryPool(); - MemoryPool currentTaskPool = queryPool.addChildPool("kout-main-task"); - MemoryManager.getInstance() - .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), - (TaskMemoryPool) currentTaskPool); - MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation"); + Optional.ofNullable(queryPool).ifPresent(pool -> { + MemoryPool currentTaskPool = pool.addChildPool("kout-main-task"); + MemoryManager.getInstance() + .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), + (TaskMemoryPool) currentTaskPool); + MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation"); + }); try { ApiMeasurer measure = new ApiMeasurer(); @@ -125,7 +128,8 @@ public String get(@Context GraphManager manager, } return manager.serializer(g, measure.measures()).writeList("vertices", ids); } finally { - queryPool.releaseSelf("Complete kout query", false); + Optional.ofNullable(queryPool) + .ifPresent(pool -> MemoryManager.getInstance().gcQueryMemoryPool(pool)); } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java index 83ab7f51ad..39cdce8fd1 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java @@ -260,6 +260,7 @@ private Iterator queryVerticesByIds(IdQuery query) { return QueryResults.emptyIterator(); } if (needCacheVertex(vertex)) { + vertex.convertIdToOnHeapIfNeeded(); this.verticesCache.update(vertex.id(), vertex); } return QueryResults.iterator(vertex); @@ -295,6 +296,7 @@ private Iterator queryVerticesByIds(IdQuery query) { for (HugeVertex vertex : listIterator.list()) { // Skip large vertex if (needCacheVertex(vertex)) { + vertex.convertIdToOnHeapIfNeeded(); this.verticesCache.update(vertex.id(), vertex); } } @@ -353,6 +355,7 @@ protected Iterator queryEdgesFromBackend(Query query) { if (edges.isEmpty()) { this.edgesCache.update(cacheKey, Collections.emptyList()); } else if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) { + edges.forEach(HugeEdge::convertIdToOnHeapIfNeeded); this.edgesCache.update(cacheKey, edges); } @@ -378,6 +381,7 @@ protected void commitMutation2Backend(BackendMutation... mutations) { vertexIds[vertexOffset++] = vertex.id(); if (needCacheVertex(vertex)) { // Update cache + vertex.convertIdToOnHeapIfNeeded(); this.verticesCache.updateIfPresent(vertex.id(), vertex); } else { // Skip large vertex diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java index 4f9e5f5937..41502789b9 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -176,6 +176,9 @@ private void clearCache(boolean notify) { private void updateCache(SchemaElement schema) { this.resetCachedAllIfReachedCapacity(); + // convert schema.id to on heap if needed. + schema.convertIdToOnHeapIfNeeded(); + // update id cache Id prefixedId = generateId(schema.type(), schema.id()); this.idCache.update(prefixedId, schema); @@ -204,14 +207,20 @@ private void invalidateCache(HugeType type, Id id) { this.arrayCaches.remove(type, id); } + /** + * Ids used in cache must be on-heap object + */ private static Id generateId(HugeType type, Id id) { // NOTE: it's slower performance to use: // String.format("%x-%s", type.code(), name) - return IdGenerator.of(type.string() + "-" + id.asString()); + return new IdGenerator.StringId(type.string() + "-" + id.asString()); } + /** + * Ids used in cache must be on-heap object + */ private static Id generateId(HugeType type, String name) { - return IdGenerator.of(type.string() + "-" + name); + return new IdGenerator.StringId(type.string() + "-" + name); } @Override diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java index e6a5e78533..31bcf42382 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java @@ -42,6 +42,7 @@ import com.google.common.collect.ImmutableSet; public class CachedSchemaTransactionV2 extends SchemaTransactionV2 { + private final Cache idCache; private final Cache nameCache; @@ -51,8 +52,8 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 { private EventListener cacheEventListener; public CachedSchemaTransactionV2(MetaDriver metaDriver, - String cluster, - HugeGraphParams graphParams) { + String cluster, + HugeGraphParams graphParams) { super(metaDriver, cluster, graphParams); final long capacity = graphParams.configuration() @@ -223,6 +224,9 @@ protected void addSchema(SchemaElement schema) { private void updateCache(SchemaElement schema) { this.resetCachedAllIfReachedCapacity(); + // convert schema.id to on heap if needed. + schema.convertIdToOnHeapIfNeeded(); + // update id cache Id prefixedId = generateId(schema.type(), schema.id()); this.idCache.update(prefixedId, schema); @@ -268,10 +272,12 @@ protected T getSchema(HugeType type, Id id) { value = super.getSchema(type, id); if (value != null) { this.resetCachedAllIfReachedCapacity(); + // convert schema.id to on heap if needed. + SchemaElement schema = (SchemaElement) value; + schema.convertIdToOnHeapIfNeeded(); - this.idCache.update(prefixedId, value); + this.idCache.update(prefixedId, schema); - SchemaElement schema = (SchemaElement) value; Id prefixedName = generateId(schema.type(), schema.name()); this.nameCache.update(prefixedName, schema); } @@ -321,6 +327,9 @@ protected List getAllSchema(HugeType type) { if (results.size() <= free) { // Update cache for (T schema : results) { + // convert schema.id to on heap if needed. + schema.convertIdToOnHeapIfNeeded(); + Id prefixedId = generateId(schema.type(), schema.id()); this.idCache.update(prefixedId, schema); @@ -481,7 +490,7 @@ public CachedTypes cachedTypes() { } private static class CachedTypes - extends ConcurrentHashMap { + extends ConcurrentHashMap { private static final long serialVersionUID = -2215549791679355996L; } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java index 2bfbedd2ae..9ee7abe049 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java @@ -687,7 +687,7 @@ public static synchronized CoreOptions instance() { "memory.mode", "The memory mode used for query in HugeGraph.", disallowEmpty(), - "off-heap" + "disable" ); public static final ConfigOption MAX_MEMORY_CAPACITY = new ConfigOption<>( diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/MemoryManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/MemoryManager.java index b2ac195e3e..46b1311257 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/MemoryManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/MemoryManager.java @@ -77,7 +77,7 @@ public class MemoryManager { private final MemoryArbitrator memoryArbitrator; private final ExecutorService arbitrateExecutor; - private static MemoryMode MEMORY_MODE = MemoryMode.ENABLE_OFF_HEAP_MANAGEMENT; + private static MemoryMode MEMORY_MODE = MemoryMode.DISABLE_MEMORY_MANAGEMENT; private MemoryManager() { this.memoryArbitrator = new MemoryArbitratorImpl(this); @@ -89,6 +89,10 @@ private MemoryManager() { } public MemoryPool addQueryMemoryPool() { + if (MEMORY_MODE == MemoryMode.DISABLE_MEMORY_MANAGEMENT) { + return null; + } + int count = queryMemoryPools.size(); String poolName = QUERY_MEMORY_POOL_NAME_PREFIX + DELIMINATOR + count + DELIMINATOR + diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/schema/SchemaElement.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/schema/SchemaElement.java index 966d3eed8a..250515a6e3 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/schema/SchemaElement.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/schema/SchemaElement.java @@ -24,6 +24,7 @@ import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.memory.consumer.OffHeapObject; import org.apache.hugegraph.type.Nameable; import org.apache.hugegraph.type.Typeable; import org.apache.hugegraph.type.define.SchemaStatus; @@ -58,9 +59,9 @@ public abstract class SchemaElement implements Nameable, Typeable, protected final HugeGraph graph; - private final Id id; private final String name; private final Userdata userdata; + private Id id; private SchemaStatus status; public SchemaElement(final HugeGraph graph, Id id, String name) { @@ -83,6 +84,12 @@ public Id id() { return this.id; } + public void convertIdToOnHeapIfNeeded() { + if (this.id instanceof OffHeapObject) { + this.id = (Id) ((OffHeapObject) this.id).zeroCopyReadFromByteBuf(); + } + } + public long longId() { return this.id.asLong(); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeEdge.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeEdge.java index ac23a7a1e4..041232e63b 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeEdge.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeEdge.java @@ -30,6 +30,7 @@ import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.serializer.BytesBuffer; import org.apache.hugegraph.backend.tx.GraphTransaction; +import org.apache.hugegraph.memory.consumer.OffHeapObject; import org.apache.hugegraph.memory.consumer.factory.PropertyFactory; import org.apache.hugegraph.perf.PerfUtil.Watched; import org.apache.hugegraph.schema.EdgeLabel; @@ -80,6 +81,12 @@ public HugeEdge(final HugeGraph graph, Id id, EdgeLabel label) { this.isOutEdge = true; } + public void convertIdToOnHeapIfNeeded() { + if (this.id instanceof OffHeapObject) { + this.id = (Id) ((OffHeapObject) this.id).zeroCopyReadFromByteBuf(); + } + } + @Override public HugeType type() { // NOTE: we optimize the edge type that let it include direction diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java index 29abc007e9..1772f32c33 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java @@ -40,6 +40,7 @@ import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.masterelection.StandardClusterRoleStore; +import org.apache.hugegraph.memory.consumer.OffHeapObject; import org.apache.hugegraph.memory.consumer.factory.PropertyFactory; import org.apache.hugegraph.perf.PerfUtil.Watched; import org.apache.hugegraph.schema.EdgeLabel; @@ -93,6 +94,13 @@ public HugeVertex(final HugeGraph graph, Id id, VertexLabel label) { } } + public void convertIdToOnHeapIfNeeded() { + if (this.id instanceof OffHeapObject) { + this.id = (Id) ((OffHeapObject) this.id).zeroCopyReadFromByteBuf(); + } + } + + @Override public HugeType type() { if (label != null && diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java index 8122c79080..2608de95d8 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java @@ -639,7 +639,7 @@ protected void checkVertexExist(Id vertexId, String name) { this.graph.vertex(vertexId); } catch (NotFoundException e) { throw new IllegalArgumentException(String.format( - "The %s with id '%s' does not exist", name, vertexId), e); + "The %s with id '%s' does not exist: %s", name, vertexId, e.getMessage()), e); } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index a21a23d71a..820eab74b6 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -112,18 +114,21 @@ public void start(String name) { new ContextCallable<>(() -> this.runAndDone(MemoryManager.getInstance() .getCorrespondingTaskMemoryPool( Thread.currentThread() - .getName()) - .findRootQueryPool())))); + .getName()))))); } } - private Void runAndDone(MemoryPool queryPool) { - MemoryPool currentTaskPool = queryPool.addChildPool("kout-consume-task"); - MemoryManager.getInstance() - .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), - (TaskMemoryPool) currentTaskPool); - MemoryPool currentOperationPool = - currentTaskPool.addChildPool("kout-consume-operation"); + private Void runAndDone(MemoryPool taskPool) { + MemoryPool currentTaskPool = null; + if (Objects.nonNull(taskPool)) { + currentTaskPool = taskPool.findRootQueryPool().addChildPool("kout-consume-task"); + MemoryManager.getInstance() + .bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(), + (TaskMemoryPool) currentTaskPool); + MemoryPool currentOperationPool = + currentTaskPool.addChildPool("kout-consume-operation"); + } + try { this.run(); } catch (Throwable e) { @@ -139,7 +144,9 @@ private Void runAndDone(MemoryPool queryPool) { } finally { this.done(); this.latch.countDown(); - currentTaskPool.releaseSelf("Complete kout consume task", false); + Optional.ofNullable(currentTaskPool) + .ifPresent(pool -> pool.releaseSelf("Complete kout consume task", + false)); } return null; }