diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java index 3d2935e669..8243b0360b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java @@ -20,6 +20,7 @@ package com.baidu.hugegraph.backend.cache; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.slf4j.Logger; @@ -45,6 +46,9 @@ public abstract class AbstractCache implements Cache { private final long capacity; private final long halfCapacity; + // For user attachment + private final AtomicReference attachment; + public AbstractCache() { this(DEFAULT_SIZE); } @@ -55,6 +59,7 @@ public AbstractCache(long capacity) { } this.capacity = capacity; this.halfCapacity = this.capacity >> 1; + this.attachment = new AtomicReference<>(); } @Watched(prefix = "cache") @@ -204,6 +209,19 @@ public final long capacity() { return this.capacity; } + @Override + public T attachment(T object) { + this.attachment.compareAndSet(null, object); + return this.attachment(); + } + + @Override + public T attachment() { + @SuppressWarnings("unchecked") + T attachment = (T) this.attachment.get(); + return attachment; + } + protected final long halfCapacity() { return this.halfCapacity; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java index fbbcf11b32..c8294af949 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/Cache.java @@ -55,4 +55,8 @@ public interface Cache { public long hits(); public long miss(); + + public T attachment(T object); + + public T attachment(); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index a8d2ed9575..f567940ea1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -29,9 +28,9 @@ import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; import com.baidu.hugegraph.backend.store.BackendStore; +import com.baidu.hugegraph.backend.store.ram.IntObjectMap; import com.baidu.hugegraph.backend.tx.SchemaTransaction; import com.baidu.hugegraph.config.CoreOptions; -import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.event.EventHub; import com.baidu.hugegraph.event.EventListener; import com.baidu.hugegraph.schema.SchemaElement; @@ -44,17 +43,25 @@ public final class CachedSchemaTransaction extends SchemaTransaction { private final Cache idCache; private final Cache nameCache; + private final SchemaCaches arrayCaches; + private EventListener storeEventListener; private EventListener cacheEventListener; - private static final Map CACHED_TYPES = - new ConcurrentHashMap<>(); - public CachedSchemaTransaction(HugeGraphParams graph, BackendStore store) { super(graph, store); - this.idCache = this.cache("schema-id"); - this.nameCache = this.cache("schema-name"); + final long capacity = graph.configuration() + .get(CoreOptions.SCHEMA_CACHE_CAPACITY); + this.idCache = this.cache("schema-id", capacity); + this.nameCache = this.cache("schema-name", capacity); + + SchemaCaches attachment = this.idCache.attachment(); + if (attachment == null) { + int acSize = (int) (capacity >> 3); + attachment = this.idCache.attachment(new SchemaCaches<>(acSize)); + } + this.arrayCaches = attachment; this.listenChanges(); } @@ -69,23 +76,12 @@ public void close() { } } - private Cache cache(String prefix) { - HugeConfig conf = super.params().configuration(); - + private Cache cache(String prefix, long capacity) { final String name = prefix + "-" + this.graphName(); - final long capacity = conf.get(CoreOptions.SCHEMA_CACHE_CAPACITY); // NOTE: must disable schema cache-expire due to getAllSchema() return CacheManager.instance().cache(name, capacity); } - private CachedTypes cachedTypes() { - String graph = this.params().name(); - if (!CACHED_TYPES.containsKey(graph)) { - CACHED_TYPES.putIfAbsent(graph, new CachedTypes()); - } - return CACHED_TYPES.get(graph); - } - private void listenChanges() { // Listen store event: "store.init", "store.clear", ... Set storeEvents = ImmutableSet.of(Events.STORE_INIT, @@ -111,6 +107,8 @@ private void listenChanges() { if ("invalid".equals(args[0])) { HugeType type = (HugeType) args[1]; Id id = (Id) args[2]; + this.arrayCaches.remove(type, id); + id = generateId(type, id); Object value = this.idCache.get(id); if (value != null) { @@ -139,7 +137,7 @@ private void listenChanges() { private void clearCache() { this.idCache.clear(); this.nameCache.clear(); - this.cachedTypes().clear(); + this.arrayCaches.clear(); } private void unlistenChanges() { @@ -155,7 +153,7 @@ private void resetCachedAllIfReachedCapacity() { if (this.idCache.size() >= this.idCache.capacity()) { LOG.warn("Schema cache reached capacity({}): {}", this.idCache.capacity(), this.idCache.size()); - this.cachedTypes().clear(); + this.arrayCaches.cachedTypes().clear(); } } @@ -175,9 +173,11 @@ protected void addSchema(SchemaElement schema) { this.resetCachedAllIfReachedCapacity(); + // update id cache Id prefixedId = generateId(schema.type(), schema.id()); this.idCache.update(prefixedId, schema); + // update name cache Id prefixedName = generateId(schema.type(), schema.name()); this.nameCache.update(prefixedName, schema); } @@ -185,6 +185,14 @@ protected void addSchema(SchemaElement schema) { @Override @SuppressWarnings("unchecked") protected T getSchema(HugeType type, Id id) { + // try get from optimized array cache + if (id.number() && id.asLong() > 0) { + SchemaElement value = this.arrayCaches.get(type, id); + if (value != null) { + return (T) value; + } + } + Id prefixedId = generateId(type, id); Object value = this.idCache.get(prefixedId); if (value == null) { @@ -199,6 +207,12 @@ protected T getSchema(HugeType type, Id id) { this.nameCache.update(prefixedName, schema); } } + + // update optimized array cache + if (value != null && id.number() && id.asLong() > 0) { + this.arrayCaches.set(type, id, (SchemaElement) value); + } + return (T) value; } @@ -236,11 +250,15 @@ protected void removeSchema(SchemaElement schema) { Id prefixedName = generateId(schema.type(), schema.name()); this.nameCache.invalidate(prefixedName); } + + // remove from optimized array cache + this.arrayCaches.remove(schema.type(), schema.id()); } @Override protected List getAllSchema(HugeType type) { - Boolean cachedAll = this.cachedTypes().getOrDefault(type, false); + Boolean cachedAll = this.arrayCaches.cachedTypes() + .getOrDefault(type, false); if (cachedAll) { List results = new ArrayList<>(); // Get from cache @@ -264,12 +282,120 @@ protected List getAllSchema(HugeType type) { Id prefixedName = generateId(schema.type(), schema.name()); this.nameCache.update(prefixedName, schema); } - this.cachedTypes().putIfAbsent(type, true); + this.arrayCaches.cachedTypes().putIfAbsent(type, true); } return results; } } + private static final class SchemaCaches { + + private final int size; + + private final IntObjectMap pks; + private final IntObjectMap vls; + private final IntObjectMap els; + private final IntObjectMap ils; + + private final CachedTypes cachedTypes; + + public SchemaCaches(int size) { + // TODO: improve size of each type for optimized array cache + this.size = size; + + this.pks = new IntObjectMap<>(size); + this.vls = new IntObjectMap<>(size); + this.els = new IntObjectMap<>(size); + this.ils = new IntObjectMap<>(size); + + this.cachedTypes = new CachedTypes(); + } + + public V get(HugeType type, Id id) { + assert id.number() && id.asLong() > 0 : id; + int key = (int) id.asLong(); + if (key >= this.size) { + return null; + } + switch (type) { + case PROPERTY_KEY: + return this.pks.get(key); + case VERTEX_LABEL: + return this.vls.get(key); + case EDGE_LABEL: + return this.els.get(key); + case INDEX_LABEL: + return this.ils.get(key); + default: + return null; + } + } + + public void set(HugeType type, Id id, V value) { + assert id.number() && id.asLong() > 0 : id; + int key = (int) id.asLong(); + if (key >= this.size) { + return; + } + switch (type) { + case PROPERTY_KEY: + this.pks.set(key, value); + break; + case VERTEX_LABEL: + this.vls.set(key, value); + break; + case EDGE_LABEL: + this.els.set(key, value); + break; + case INDEX_LABEL: + this.ils.set(key, value); + break; + default: + // pass + break; + } + } + + public void remove(HugeType type, Id id) { + assert id.number() && id.asLong() > 0 : id; + int key = (int) id.asLong(); + V value = null; + if (key >= this.size) { + return; + } + switch (type) { + case PROPERTY_KEY: + this.pks.set(key, value); + break; + case VERTEX_LABEL: + this.vls.set(key, value); + break; + case EDGE_LABEL: + this.els.set(key, value); + break; + case INDEX_LABEL: + this.ils.set(key, value); + break; + default: + // pass + break; + } + } + + public void clear() { + this.pks.clear(); + this.vls.clear(); + this.els.clear(); + this.ils.clear(); + + this.cachedTypes.clear(); + } + + public CachedTypes cachedTypes() { + return this.cachedTypes; + } + } + private static class CachedTypes extends ConcurrentHashMap { diff --git a/hugegraph-test/src/main/resources/hugegraph.properties b/hugegraph-test/src/main/resources/hugegraph.properties index 5a148b2caa..2000281d16 100644 --- a/hugegraph-test/src/main/resources/hugegraph.properties +++ b/hugegraph-test/src/main/resources/hugegraph.properties @@ -17,6 +17,11 @@ query.batch_size=4 query.page_size=2 query.index_intersect_threshold=2 +#schema.cache_capacity=1000000 +#query.ramtable_enable=true +#query.ramtable_vertices_capacity=1800 +#query.ramtable_edges_capacity=1200 + # cassandra backend config cassandra.host=127.0.0.1 cassandra.port=9042