Skip to content

Commit

Permalink
Split options graph.cache_xx into [vertex|edge].cache_xx
Browse files Browse the repository at this point in the history
improve #55

Change-Id: Ice226f047acc47a2f497fc774b34d0c5f42ebcc9
  • Loading branch information
javeme committed Sep 17, 2018
1 parent 95342c8 commit 7fef41c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;

import com.baidu.hugegraph.util.Log;

public class CacheManager {
Expand Down Expand Up @@ -56,14 +57,24 @@ private TimerTask scheduleTimer(float period) {
TimerTask task = new TimerTask() {
@Override
public void run() {
try {
this.tick();
} catch (Throwable e) {
LOG.warn("An exception occurred when running tick", e);
}
}

private void tick() {
for (Entry<String, Cache> entry : caches().entrySet()) {
LOG.debug("Cache '{}' expiration tick", entry.getKey());
entry.getValue().tick();
}
}
};
// The period in seconds

// Schedule task with the period in seconds
this.timer.schedule(task, 0, (long) (period * 1000.0));

return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,27 @@

public class CachedGraphTransaction extends GraphTransaction {

private final static int MAX_CACHE_EDGES_PER_QUERY = 100;

private final Cache verticesCache;
private final Cache edgesCache;

public CachedGraphTransaction(HugeGraph graph, BackendStore store) {
super(graph, store);
this.verticesCache = this.cache("vertex");
this.edgesCache = this.cache("edge");
}

private Cache cache(String prefix) {
HugeConfig conf = super.graph().configuration();
HugeConfig conf = graph.configuration();

final String name = prefix + "-" + super.graph().name();
final int capacity = conf.get(CoreOptions.GRAPH_CACHE_CAPACITY);
final int expire = conf.get(CoreOptions.GRAPH_CACHE_EXPIRE);
int capacity = conf.get(CoreOptions.VERTEX_CACHE_CAPACITY);
int expire = conf.get(CoreOptions.VERTEX_CACHE_EXPIRE);
this.verticesCache = this.cache("vertex", capacity, expire);

capacity = conf.get(CoreOptions.EDGE_CACHE_CAPACITY);
expire = conf.get(CoreOptions.EDGE_CACHE_EXPIRE);
this.edgesCache = this.cache("edge", capacity, expire);
}

private Cache cache(String prefix, int capacity, long expire) {
String name = prefix + "-" + super.graph().name();
Cache cache = CacheManager.instance().cache(name, capacity);
cache.expire(expire);
return cache;
Expand Down Expand Up @@ -101,12 +106,16 @@ public Iterator<Edge> queryEdges(Query query) {
return super.queryEdges(query);
}

Object result = this.edgesCache.getOrFetch(new QueryId(query), id -> {
// Iterator can't be cached, caching list instead
return ImmutableList.copyOf(super.queryEdges(query));
});
Id id = new QueryId(query);
@SuppressWarnings("unchecked")
List<Edge> edges = (List<Edge>) result;
List<Edge> edges = (List<Edge>) this.edgesCache.get(id);
if (edges == null) {
// Iterator can't be cached, caching list instead
edges = ImmutableList.copyOf(super.queryEdges(query));
if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) {
this.edgesCache.update(id, edges);
}
}
return edges.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -49,23 +49,24 @@ public class RamCache implements Cache {
// Default expire time(ms)
private volatile long expire = 0L;

private final KeyLock keyLock;
// NOTE: the count in number of items, not in bytes
private final int capacity;

// Implement LRU cache
private final Map<Id, LinkNode<Id, Object>> map;
private final ConcurrentMap<Id, LinkNode<Id, Object>> map;
private final LinkedQueueNonBigLock<Id, Object> queue;

private final KeyLock keyLock;

public RamCache() {
this(DEFAULT_SIZE);
}

// NOTE: count in number of items, not in bytes
public RamCache(int capacity) {
this.keyLock = new KeyLock();

if (capacity < 8) {
capacity = 8;
if (capacity < 1) {
capacity = 1;
}
this.capacity = capacity;

Expand Down Expand Up @@ -287,25 +288,25 @@ public long expire() {

@Override
public void tick() {
if (this.expire <= 0) {
long expireTime = this.expire;
if (expireTime <= 0) {
return;
}

int expireItems = 0;
long current = now();
List<Id> expireItems = new LinkedList<>();
for (LinkNode<Id, Object> node : this.map.values()) {
if (current - node.time() > this.expire) {
expireItems.add(node.key());
if (current - node.time() > expireTime) {
// Remove item while iterating map (it must be ConcurrentMap)
this.remove(node.key());
expireItems++;
}
}

LOG.debug("Cache expire items: {} (expire {}ms)",
expireItems.size(), this.expire);
for (Id id : expireItems) {
this.remove(id);
if (expireItems > 0) {
LOG.info("Cache expired {} items cost {}ms (size {}, expire {}ms)",
expireItems, now() - current, this.size(), expireTime);
}
LOG.debug("Cache expired items: {} (size {})",
expireItems.size(), size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,39 @@ public static synchronized CoreOptions instance() {
public static final ConfigOption<Integer> SCHEMA_CACHE_CAPACITY =
new ConfigOption<>(
"schema.cache_capacity",
"The max cache size(items) of schema data.",
"The max cache size(items) of schema cache.",
rangeInt(1, Integer.MAX_VALUE),
(1024 * 1024 * 1)
100000
);

public static final ConfigOption<Integer> GRAPH_CACHE_CAPACITY =
public static final ConfigOption<Integer> VERTEX_CACHE_CAPACITY =
new ConfigOption<>(
"graph.cache_capacity",
"The max cache size(items) of graph data(vertex/edge).",
"vertex.cache_capacity",
"The max cache size(items) of vertex cache.",
rangeInt(1, Integer.MAX_VALUE),
(1024 * 1024 * 10)
(1000 * 1000 * 10)
);

public static final ConfigOption<Integer> GRAPH_CACHE_EXPIRE =
public static final ConfigOption<Integer> VERTEX_CACHE_EXPIRE =
new ConfigOption<>(
"graph.cache_expire",
"The expire time in seconds of graph data(vertex/edge).",
"vertex.cache_expire",
"The expire time in seconds of vertex cache.",
rangeInt(0, Integer.MAX_VALUE),
(60 * 10)
);

public static final ConfigOption<Integer> EDGE_CACHE_CAPACITY =
new ConfigOption<>(
"edge.cache_capacity",
"The max cache size(items) of edge cache.",
rangeInt(1, Integer.MAX_VALUE),
(1000 * 1000 * 1)
);

public static final ConfigOption<Integer> EDGE_CACHE_EXPIRE =
new ConfigOption<>(
"edge.cache_expire",
"The expire time in seconds of edge cache.",
rangeInt(0, Integer.MAX_VALUE),
(60 * 10)
);
Expand Down

0 comments on commit 7fef41c

Please sign in to comment.