Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve CachedGraphTransaction perf #1743

Merged
merged 1 commit into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public <V> Cache<Id, V> cache(String name) {
public <V> Cache<Id, V> cache(String name, long capacity) {
if (!this.caches.containsKey(name)) {
this.caches.putIfAbsent(name, new RamCache(capacity));
LOG.info("Init RamCache for '{}' with capacity {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use info for it (or use debug) ? like the first time fulfill cache

and RamCache seems not clearly compare to OffHeapCache? (level cache also have this problem)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a frequent action, and just log for 3 cache entrances(cache/offheapCache/levelCache).

RamCache may be renamed HeapCache in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, use HeapCache or InHeapCache is more readable

name, capacity);
}
@SuppressWarnings("unchecked")
Cache<Id, V> cache = (Cache<Id, V>) this.caches.get(name);
Expand All @@ -124,6 +126,8 @@ public <V> Cache<Id, V> offheapCache(HugeGraph graph, String name,
if (!this.caches.containsKey(name)) {
OffheapCache cache = new OffheapCache(graph, capacity, avgElemSize);
this.caches.putIfAbsent(name, cache);
LOG.info("Init OffheapCache for '{}' with capacity {}",
name, capacity);
}
@SuppressWarnings("unchecked")
Cache<Id, V> cache = (Cache<Id, V>) this.caches.get(name);
Expand All @@ -140,6 +144,8 @@ public <V> Cache<Id, V> levelCache(HugeGraph graph, String name,
OffheapCache cache2 = new OffheapCache(graph, capacity2,
avgElemSize);
this.caches.putIfAbsent(name, new LevelCache(cache1, cache2));
LOG.info("Init LevelCache for '{}' with capacity {}:{}",
name, capacity1, capacity2);
}
@SuppressWarnings("unchecked")
Cache<Id, V> cache = (Cache<Id, V>) this.caches.get(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,51 @@ private void clearCache(HugeType type, boolean notify) {
}
}

private boolean enableCacheVertex() {
return this.verticesCache.capacity() > 0L;
}

private boolean enableCacheEdge() {
return this.edgesCache.capacity() > 0L;
}

private boolean needCacheVertex(HugeVertex vertex) {
return vertex.sizeOfSubProperties() <= MAX_CACHE_PROPS_PER_VERTEX;
}

@Override
@Watched(prefix = "graphcache")
protected final Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
if (!query.ids().isEmpty() && query.conditions().isEmpty()) {
if (this.enableCacheVertex() &&
query.idsSize() > 0 && query.conditionsSize() == 0) {
return this.queryVerticesByIds((IdQuery) query);
} else {
return super.queryVerticesFromBackend(query);
}
}

@Watched(prefix = "graphcache")
private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
if (query.idsSize() == 1) {
Id vertexId = query.ids().iterator().next();
HugeVertex vertex = (HugeVertex) this.verticesCache.get(vertexId);
if (vertex != null) {
if (!vertex.expired()) {
return QueryResults.iterator(vertex);
}
this.verticesCache.invalidate(vertexId);
}
Iterator<HugeVertex> rs = super.queryVerticesFromBackend(query);
vertex = QueryResults.one(rs);
if (vertex == null) {
return QueryResults.emptyIterator();
}
if (needCacheVertex(vertex)) {
this.verticesCache.update(vertex.id(), vertex);
}
return QueryResults.iterator(vertex);
}

IdQuery newQuery = new IdQuery(HugeType.VERTEX, query);
List<HugeVertex> vertices = new ArrayList<>();
for (Id vertexId : query.ids()) {
Expand Down Expand Up @@ -254,11 +289,10 @@ private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
// Generally there are not too much data with id query
ListIterator<HugeVertex> listIterator = QueryResults.toList(rs);
for (HugeVertex vertex : listIterator.list()) {
if (vertex.sizeOfSubProperties() > MAX_CACHE_PROPS_PER_VERTEX) {
// Skip large vertex
continue;
// Skip large vertex
if (needCacheVertex(vertex)) {
this.verticesCache.update(vertex.id(), vertex);
}
this.verticesCache.update(vertex.id(), vertex);
}
results.extend(listIterator);
}
Expand All @@ -267,14 +301,15 @@ private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
}

@Override
@Watched
@Watched(prefix = "graphcache")
protected final Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
RamTable ramtable = this.params().ramtable();
if (ramtable != null && ramtable.matched(query)) {
return ramtable.query(query);
}

if (query.empty() || query.paging() || query.bigCapacity()) {
if (!this.enableCacheEdge() || query.empty() ||
query.paging() || query.bigCapacity()) {
// Query all edges or query edges in paging, don't cache it
return super.queryEdgesFromBackend(query);
}
Expand Down Expand Up @@ -321,6 +356,7 @@ protected final Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
}

@Override
@Watched(prefix = "graphcache")
protected final void commitMutation2Backend(BackendMutation... mutations) {
// Collect changes before commit
Collection<HugeVertex> updates = this.verticesInTxUpdated();
Expand All @@ -333,28 +369,33 @@ protected final void commitMutation2Backend(BackendMutation... mutations) {
try {
super.commitMutation2Backend(mutations);
// Update vertex cache
for (HugeVertex vertex : updates) {
vertexIds[vertexOffset++] = vertex.id();
if (vertex.sizeOfSubProperties() > MAX_CACHE_PROPS_PER_VERTEX) {
// Skip large vertex
this.verticesCache.invalidate(vertex.id());
continue;
if (this.enableCacheVertex()) {
for (HugeVertex vertex : updates) {
vertexIds[vertexOffset++] = vertex.id();
if (needCacheVertex(vertex)) {
// Update cache
this.verticesCache.updateIfPresent(vertex.id(), vertex);
} else {
// Skip large vertex
this.verticesCache.invalidate(vertex.id());
}
}
this.verticesCache.updateIfPresent(vertex.id(), vertex);
}
} finally {
// Update removed vertex in cache whatever success or fail
for (HugeVertex vertex : deletions) {
vertexIds[vertexOffset++] = vertex.id();
this.verticesCache.invalidate(vertex.id());
}
if (vertexOffset > 0) {
this.notifyChanges(Cache.ACTION_INVALIDED,
HugeType.VERTEX, vertexIds);
if (this.enableCacheVertex()) {
for (HugeVertex vertex : deletions) {
vertexIds[vertexOffset++] = vertex.id();
this.verticesCache.invalidate(vertex.id());
}
if (vertexOffset > 0) {
this.notifyChanges(Cache.ACTION_INVALIDED,
HugeType.VERTEX, vertexIds);
}
}

// Update edge cache if any edges change
if (edgesInTxSize > 0) {
if (edgesInTxSize > 0 && this.enableCacheEdge()) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down