Skip to content

Commit

Permalink
cache use on-heap id && adopt for disabled mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna committed Dec 11, 2024
1 parent a342870 commit db3ee80
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.hugegraph.HugeGraph;
Expand Down Expand Up @@ -97,11 +98,13 @@ public String get(@Context GraphManager manager,
graph, source, direction, edgeLabel, depth,
nearest, maxDegree, capacity, limit);
MemoryPool queryPool = MemoryManager.getInstance().addQueryMemoryPool();
MemoryPool currentTaskPool = queryPool.addChildPool("kout-main-task");
MemoryManager.getInstance()
.bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(),
(TaskMemoryPool) currentTaskPool);
MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation");
Optional.ofNullable(queryPool).ifPresent(pool -> {
MemoryPool currentTaskPool = pool.addChildPool("kout-main-task");
MemoryManager.getInstance()
.bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(),

Check warning on line 104 in hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java#L102-L104

Added lines #L102 - L104 were not covered by tests
(TaskMemoryPool) currentTaskPool);
MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation");
});

Check warning on line 107 in hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java#L106-L107

Added lines #L106 - L107 were not covered by tests

try {
ApiMeasurer measure = new ApiMeasurer();
Expand All @@ -125,7 +128,8 @@ public String get(@Context GraphManager manager,
}
return manager.serializer(g, measure.measures()).writeList("vertices", ids);
} finally {
queryPool.releaseSelf("Complete kout query", false);
Optional.ofNullable(queryPool)
.ifPresent(pool -> MemoryManager.getInstance().gcQueryMemoryPool(pool));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
return QueryResults.emptyIterator();
}
if (needCacheVertex(vertex)) {
vertex.convertIdToOnHeapIfNeeded();
this.verticesCache.update(vertex.id(), vertex);
}
return QueryResults.iterator(vertex);
Expand Down Expand Up @@ -295,6 +296,7 @@ private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
for (HugeVertex vertex : listIterator.list()) {
// Skip large vertex
if (needCacheVertex(vertex)) {
vertex.convertIdToOnHeapIfNeeded();
this.verticesCache.update(vertex.id(), vertex);
}
}
Expand Down Expand Up @@ -353,6 +355,7 @@ protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
if (edges.isEmpty()) {
this.edgesCache.update(cacheKey, Collections.emptyList());
} else if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) {
edges.forEach(HugeEdge::convertIdToOnHeapIfNeeded);
this.edgesCache.update(cacheKey, edges);
}

Expand All @@ -378,6 +381,7 @@ protected void commitMutation2Backend(BackendMutation... mutations) {
vertexIds[vertexOffset++] = vertex.id();
if (needCacheVertex(vertex)) {
// Update cache
vertex.convertIdToOnHeapIfNeeded();
this.verticesCache.updateIfPresent(vertex.id(), vertex);
} else {
// Skip large vertex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ private void clearCache(boolean notify) {
private void updateCache(SchemaElement schema) {
this.resetCachedAllIfReachedCapacity();

// convert schema.id to on heap if needed.
schema.convertIdToOnHeapIfNeeded();

// update id cache
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
Expand Down Expand Up @@ -204,14 +207,20 @@ private void invalidateCache(HugeType type, Id id) {
this.arrayCaches.remove(type, id);
}

/**
* Ids used in cache must be on-heap object
*/
private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
return IdGenerator.of(type.string() + "-" + id.asString());
return new IdGenerator.StringId(type.string() + "-" + id.asString());
}

/**
* Ids used in cache must be on-heap object
*/
private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
return new IdGenerator.StringId(type.string() + "-" + name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.collect.ImmutableSet;

public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {

private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;

Expand All @@ -51,8 +52,8 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {
private EventListener cacheEventListener;

public CachedSchemaTransactionV2(MetaDriver metaDriver,
String cluster,
HugeGraphParams graphParams) {
String cluster,
HugeGraphParams graphParams) {
super(metaDriver, cluster, graphParams);

final long capacity = graphParams.configuration()
Expand Down Expand Up @@ -223,6 +224,9 @@ protected void addSchema(SchemaElement schema) {
private void updateCache(SchemaElement schema) {
this.resetCachedAllIfReachedCapacity();

// convert schema.id to on heap if needed.
schema.convertIdToOnHeapIfNeeded();

Check warning on line 228 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L228

Added line #L228 was not covered by tests

// update id cache
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
Expand Down Expand Up @@ -268,10 +272,12 @@ protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
value = super.getSchema(type, id);
if (value != null) {
this.resetCachedAllIfReachedCapacity();
// convert schema.id to on heap if needed.
SchemaElement schema = (SchemaElement) value;
schema.convertIdToOnHeapIfNeeded();

Check warning on line 277 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L276-L277

Added lines #L276 - L277 were not covered by tests

this.idCache.update(prefixedId, value);
this.idCache.update(prefixedId, schema);

Check warning on line 279 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L279

Added line #L279 was not covered by tests

SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}
Expand Down Expand Up @@ -321,6 +327,9 @@ protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
if (results.size() <= free) {
// Update cache
for (T schema : results) {
// convert schema.id to on heap if needed.
schema.convertIdToOnHeapIfNeeded();

Check warning on line 331 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L331

Added line #L331 was not covered by tests

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Expand Down Expand Up @@ -481,7 +490,7 @@ public CachedTypes cachedTypes() {
}

private static class CachedTypes
extends ConcurrentHashMap<HugeType, Boolean> {
extends ConcurrentHashMap<HugeType, Boolean> {

private static final long serialVersionUID = -2215549791679355996L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ public static synchronized CoreOptions instance() {
"memory.mode",
"The memory mode used for query in HugeGraph.",
disallowEmpty(),
"off-heap"
"disable"
);

public static final ConfigOption<Long> MAX_MEMORY_CAPACITY = new ConfigOption<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class MemoryManager {
private final MemoryArbitrator memoryArbitrator;
private final ExecutorService arbitrateExecutor;

private static MemoryMode MEMORY_MODE = MemoryMode.ENABLE_OFF_HEAP_MANAGEMENT;
private static MemoryMode MEMORY_MODE = MemoryMode.DISABLE_MEMORY_MANAGEMENT;

private MemoryManager() {
this.memoryArbitrator = new MemoryArbitratorImpl(this);
Expand All @@ -89,6 +89,10 @@ private MemoryManager() {
}

public MemoryPool addQueryMemoryPool() {
if (MEMORY_MODE == MemoryMode.DISABLE_MEMORY_MANAGEMENT) {
return null;
}

int count = queryMemoryPools.size();
String poolName =
QUERY_MEMORY_POOL_NAME_PREFIX + DELIMINATOR + count + DELIMINATOR +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.type.Nameable;
import org.apache.hugegraph.type.Typeable;
import org.apache.hugegraph.type.define.SchemaStatus;
Expand Down Expand Up @@ -58,9 +59,9 @@ public abstract class SchemaElement implements Nameable, Typeable,

protected final HugeGraph graph;

private final Id id;
private final String name;
private final Userdata userdata;
private Id id;
private SchemaStatus status;

public SchemaElement(final HugeGraph graph, Id id, String name) {
Expand All @@ -83,6 +84,12 @@ public Id id() {
return this.id;
}

public void convertIdToOnHeapIfNeeded() {
if (this.id instanceof OffHeapObject) {
this.id = (Id) ((OffHeapObject) this.id).zeroCopyReadFromByteBuf();

Check warning on line 89 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/schema/SchemaElement.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/schema/SchemaElement.java#L89

Added line #L89 was not covered by tests
}
}

public long longId() {
return this.id.asLong();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.memory.consumer.factory.PropertyFactory;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.EdgeLabel;
Expand Down Expand Up @@ -80,6 +81,12 @@ public HugeEdge(final HugeGraph graph, Id id, EdgeLabel label) {
this.isOutEdge = true;
}

public void convertIdToOnHeapIfNeeded() {
if (this.id instanceof OffHeapObject) {
this.id = (Id) ((OffHeapObject) this.id).zeroCopyReadFromByteBuf();

Check warning on line 86 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeEdge.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeEdge.java#L86

Added line #L86 was not covered by tests
}
}

@Override
public HugeType type() {
// NOTE: we optimize the edge type that let it include direction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.masterelection.StandardClusterRoleStore;
import org.apache.hugegraph.memory.consumer.OffHeapObject;
import org.apache.hugegraph.memory.consumer.factory.PropertyFactory;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.EdgeLabel;
Expand Down Expand Up @@ -93,6 +94,13 @@ public HugeVertex(final HugeGraph graph, Id id, VertexLabel label) {
}
}

public void convertIdToOnHeapIfNeeded() {
if (this.id instanceof OffHeapObject) {
this.id = (Id) ((OffHeapObject) this.id).zeroCopyReadFromByteBuf();

Check warning on line 99 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/structure/HugeVertex.java#L99

Added line #L99 was not covered by tests
}
}


@Override
public HugeType type() {
if (label != null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ protected void checkVertexExist(Id vertexId, String name) {
this.graph.vertex(vertexId);
} catch (NotFoundException e) {
throw new IllegalArgumentException(String.format(
"The %s with id '%s' does not exist", name, vertexId), e);
"The %s with id '%s' does not exist: %s", name, vertexId, e.getMessage()), e);

Check warning on line 642 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java#L642

Added line #L642 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -112,18 +114,21 @@ public void start(String name) {
new ContextCallable<>(() -> this.runAndDone(MemoryManager.getInstance()
.getCorrespondingTaskMemoryPool(
Thread.currentThread()
.getName())
.findRootQueryPool()))));
.getName())))));
}
}

private Void runAndDone(MemoryPool queryPool) {
MemoryPool currentTaskPool = queryPool.addChildPool("kout-consume-task");
MemoryManager.getInstance()
.bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(),
(TaskMemoryPool) currentTaskPool);
MemoryPool currentOperationPool =
currentTaskPool.addChildPool("kout-consume-operation");
private Void runAndDone(MemoryPool taskPool) {
MemoryPool currentTaskPool = null;
if (Objects.nonNull(taskPool)) {
currentTaskPool = taskPool.findRootQueryPool().addChildPool("kout-consume-task");
MemoryManager.getInstance()
.bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(),

Check warning on line 126 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java#L124-L126

Added lines #L124 - L126 were not covered by tests
(TaskMemoryPool) currentTaskPool);
MemoryPool currentOperationPool =
currentTaskPool.addChildPool("kout-consume-operation");

Check warning on line 129 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java#L128-L129

Added lines #L128 - L129 were not covered by tests
}

try {
this.run();
} catch (Throwable e) {
Expand All @@ -139,7 +144,9 @@ private Void runAndDone(MemoryPool queryPool) {
} finally {
this.done();
this.latch.countDown();
currentTaskPool.releaseSelf("Complete kout consume task", false);
Optional.ofNullable(currentTaskPool)
.ifPresent(pool -> pool.releaseSelf("Complete kout consume task",
false));
}
return null;
}
Expand Down

0 comments on commit db3ee80

Please sign in to comment.