Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split options graph.cache_xx into [vertex|edge].cache_xx #56

Merged
merged 1 commit into from
Sep 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;

import com.baidu.hugegraph.util.Log;

public class CacheManager {
Expand Down Expand Up @@ -56,14 +57,24 @@ private TimerTask scheduleTimer(float period) {
TimerTask task = new TimerTask() {
@Override
public void run() {
try {
this.tick();
} catch (Throwable e) {
LOG.warn("An exception occurred when running tick", e);
}
}

private void tick() {
for (Entry<String, Cache> entry : caches().entrySet()) {
LOG.debug("Cache '{}' expiration tick", entry.getKey());
entry.getValue().tick();
}
}
};
// The period in seconds

// Schedule task with the period in seconds
this.timer.schedule(task, 0, (long) (period * 1000.0));

return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,27 @@

public class CachedGraphTransaction extends GraphTransaction {

private final static int MAX_CACHE_EDGES_PER_QUERY = 100;

private final Cache verticesCache;
private final Cache edgesCache;

public CachedGraphTransaction(HugeGraph graph, BackendStore store) {
super(graph, store);
this.verticesCache = this.cache("vertex");
this.edgesCache = this.cache("edge");
}

private Cache cache(String prefix) {
HugeConfig conf = super.graph().configuration();
HugeConfig conf = graph.configuration();

final String name = prefix + "-" + super.graph().name();
final int capacity = conf.get(CoreOptions.GRAPH_CACHE_CAPACITY);
final int expire = conf.get(CoreOptions.GRAPH_CACHE_EXPIRE);
int capacity = conf.get(CoreOptions.VERTEX_CACHE_CAPACITY);
int expire = conf.get(CoreOptions.VERTEX_CACHE_EXPIRE);
this.verticesCache = this.cache("vertex", capacity, expire);

capacity = conf.get(CoreOptions.EDGE_CACHE_CAPACITY);
expire = conf.get(CoreOptions.EDGE_CACHE_EXPIRE);
this.edgesCache = this.cache("edge", capacity, expire);
}

private Cache cache(String prefix, int capacity, long expire) {
String name = prefix + "-" + super.graph().name();
Cache cache = CacheManager.instance().cache(name, capacity);
cache.expire(expire);
return cache;
Expand Down Expand Up @@ -101,12 +106,16 @@ public Iterator<Edge> queryEdges(Query query) {
return super.queryEdges(query);
}

Object result = this.edgesCache.getOrFetch(new QueryId(query), id -> {
// Iterator can't be cached, caching list instead
return ImmutableList.copyOf(super.queryEdges(query));
});
Id id = new QueryId(query);
@SuppressWarnings("unchecked")
List<Edge> edges = (List<Edge>) result;
List<Edge> edges = (List<Edge>) this.edgesCache.get(id);
if (edges == null) {
// Iterator can't be cached, caching list instead
edges = ImmutableList.copyOf(super.queryEdges(query));
if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) {
this.edgesCache.update(id, edges);
}
}
return edges.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -49,23 +49,24 @@ public class RamCache implements Cache {
// Default expire time(ms)
private volatile long expire = 0L;

private final KeyLock keyLock;
// NOTE: the count in number of items, not in bytes
private final int capacity;

// Implement LRU cache
private final Map<Id, LinkNode<Id, Object>> map;
private final ConcurrentMap<Id, LinkNode<Id, Object>> map;
private final LinkedQueueNonBigLock<Id, Object> queue;

private final KeyLock keyLock;

public RamCache() {
this(DEFAULT_SIZE);
}

// NOTE: count in number of items, not in bytes
public RamCache(int capacity) {
this.keyLock = new KeyLock();

if (capacity < 8) {
capacity = 8;
if (capacity < 1) {
capacity = 1;
}
this.capacity = capacity;

Expand Down Expand Up @@ -287,25 +288,25 @@ public long expire() {

@Override
public void tick() {
if (this.expire <= 0) {
long expireTime = this.expire;
if (expireTime <= 0) {
return;
}

int expireItems = 0;
long current = now();
List<Id> expireItems = new LinkedList<>();
for (LinkNode<Id, Object> node : this.map.values()) {
if (current - node.time() > this.expire) {
expireItems.add(node.key());
if (current - node.time() > expireTime) {
// Remove item while iterating map (it must be ConcurrentMap)
this.remove(node.key());
expireItems++;
}
}

LOG.debug("Cache expire items: {} (expire {}ms)",
expireItems.size(), this.expire);
for (Id id : expireItems) {
this.remove(id);
if (expireItems > 0) {
LOG.info("Cache expired {} items cost {}ms (size {}, expire {}ms)",
expireItems, now() - current, this.size(), expireTime);
}
LOG.debug("Cache expired items: {} (size {})",
expireItems.size(), size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,39 @@ public static synchronized CoreOptions instance() {
public static final ConfigOption<Integer> SCHEMA_CACHE_CAPACITY =
new ConfigOption<>(
"schema.cache_capacity",
"The max cache size(items) of schema data.",
"The max cache size(items) of schema cache.",
rangeInt(1, Integer.MAX_VALUE),
(1024 * 1024 * 1)
100000
);

public static final ConfigOption<Integer> GRAPH_CACHE_CAPACITY =
public static final ConfigOption<Integer> VERTEX_CACHE_CAPACITY =
new ConfigOption<>(
"graph.cache_capacity",
"The max cache size(items) of graph data(vertex/edge).",
"vertex.cache_capacity",
"The max cache size(items) of vertex cache.",
rangeInt(1, Integer.MAX_VALUE),
(1024 * 1024 * 10)
(1000 * 1000 * 10)
);

public static final ConfigOption<Integer> GRAPH_CACHE_EXPIRE =
public static final ConfigOption<Integer> VERTEX_CACHE_EXPIRE =
new ConfigOption<>(
"graph.cache_expire",
"The expire time in seconds of graph data(vertex/edge).",
"vertex.cache_expire",
"The expire time in seconds of vertex cache.",
rangeInt(0, Integer.MAX_VALUE),
(60 * 10)
);

public static final ConfigOption<Integer> EDGE_CACHE_CAPACITY =
new ConfigOption<>(
"edge.cache_capacity",
"The max cache size(items) of edge cache.",
rangeInt(1, Integer.MAX_VALUE),
(1000 * 1000 * 1)
);

public static final ConfigOption<Integer> EDGE_CACHE_EXPIRE =
new ConfigOption<>(
"edge.cache_expire",
"The expire time in seconds of edge cache.",
rangeInt(0, Integer.MAX_VALUE),
(60 * 10)
);
Expand Down