Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare for mapdb backend #357

Merged
merged 3 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.baidu.hugegraph.backend.cache.Cache;
import com.baidu.hugegraph.backend.cache.CacheManager;
import com.baidu.hugegraph.backend.store.BackendStoreSystemInfo;
import com.baidu.hugegraph.backend.store.memory.InMemoryDBStoreProvider;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.exception.NotSupportException;
Expand Down Expand Up @@ -189,7 +188,9 @@ private void loadGraph(String name, String path) {
private void checkBackendVersionOrExit() {
for (String graph : this.graphs()) {
HugeGraph hugegraph = this.graph(graph);
if (InMemoryDBStoreProvider.matchType(hugegraph.backend())) {
boolean persistence = hugegraph.graphTransaction().store()
.features().supportsPersistence();
if (!persistence) {
hugegraph.initBackend();
}
BackendStoreSystemInfo info = new BackendStoreSystemInfo(hugegraph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ public final synchronized Cluster cluster() {
}

@Override
public final synchronized Session session() {
public final Session session() {
return (Session) super.getOrNewSession();
}

@Override
protected final synchronized Session newSession() {
protected Session newSession() {
E.checkState(this.cluster != null,
"Cassandra cluster has not been initialized");
return new Session();
Expand Down Expand Up @@ -157,7 +157,7 @@ public BatchStatement add(Statement statement) {
}

@Override
public void clear() {
public void rollback() {
this.batch.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void rollbackTx() {

session.txState(TxState.ROLLBACKING);
try {
session.clear();
session.rollback();
} finally {
// Assume batch commit would auto rollback
session.txState(TxState.CLEAN);
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.5.6</version>
<version>1.5.8</version>
</dependency>

<!-- tinkerpop -->
Expand Down
12 changes: 8 additions & 4 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,6 @@ public Iterator<Vertex> vertices(Query query) {
return this.graphTransaction().queryVertices(query);
}

public Iterator<Vertex> adjacentVertices(Iterator<Edge> edges) {
return this.graphTransaction().queryAdjacentVertices(edges);
}

@Override
public Iterator<Edge> edges(Object... objects) {
if (objects.length == 0) {
Expand All @@ -401,6 +397,14 @@ public Iterator<Edge> edges(Query query) {
return this.graphTransaction().queryEdges(query);
}

public Iterator<Vertex> adjacentVertices(Iterator<Edge> edges) {
return this.graphTransaction().queryAdjacentVertices(edges);
}

public Iterator<Edge> adjacentEdges(Id vertexId) {
return this.graphTransaction().queryEdgesByVertex(vertexId);
}

public PropertyKey propertyKey(Id id) {
PropertyKey pk = this.schemaTransaction().getPropertyKey(id);
E.checkArgument(pk != null, "Undefined property key id: '%s'", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.baidu.hugegraph.type.HugeType;
import com.google.common.collect.ImmutableList;

public class CachedGraphTransaction extends GraphTransaction {
public final class CachedGraphTransaction extends GraphTransaction {

private final static int MAX_CACHE_EDGES_PER_QUERY = 100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
Expand All @@ -40,7 +39,7 @@
import com.baidu.hugegraph.util.Events;
import com.google.common.collect.ImmutableSet;

public class CachedSchemaTransaction extends SchemaTransaction {
public final class CachedSchemaTransaction extends SchemaTransaction {

private final Cache idCache;
private final Cache nameCache;
Expand Down Expand Up @@ -153,12 +152,26 @@ private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
}

private Object getOrFetch(HugeType type, Id id,
Function<Id, Object> fetcher) {
@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.resetCachedAllIfReachedCapacity();

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value == null) {
value = fetcher.apply(id);
value = super.getSchema(type, id);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

Expand All @@ -169,15 +182,17 @@ private Object getOrFetch(HugeType type, Id id,
this.nameCache.update(prefixedName, schema);
}
}
return value;
return (T) value;
}

private Object getOrFetch(HugeType type, String name,
Function<String, Object> fetcher) {
@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type,
String name) {
Id prefixedName = generateId(type, name);
Object value = this.nameCache.get(prefixedName);
if (value == null) {
value = fetcher.apply(name);
value = super.getSchema(type, name);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

Expand All @@ -188,36 +203,6 @@ private Object getOrFetch(HugeType type, String name,
this.idCache.update(prefixedId, schema);
}
}
return value;
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.resetCachedAllIfReachedCapacity();

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
Object value = this.getOrFetch(type, id,
k -> super.getSchema(type, id));
return (T) value;
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type,
String name) {
Object value = this.getOrFetch(type, name,
k -> super.getSchema(type, name));
return (T) value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class RamCache implements Cache {

// NOTE: the count in number of items, not in bytes
private final int capacity;
private final int halfCapacity;

// Implement LRU cache
private final ConcurrentMap<Id, LinkNode<Id, Object>> map;
Expand All @@ -63,14 +64,14 @@ public RamCache() {
}

public RamCache(int capacity) {
this.keyLock = new KeyLock();

if (capacity < 1) {
capacity = 1;
if (capacity < 0) {
capacity = 0;
}
this.keyLock = new KeyLock();
this.capacity = capacity;
this.halfCapacity = this.capacity >> 1;

int initialCapacity = capacity >> 3;
int initialCapacity = capacity >= MB ? capacity >> 10 : 256;
if (initialCapacity > MAX_INIT_CAP) {
initialCapacity = MAX_INIT_CAP;
}
Expand All @@ -80,9 +81,18 @@ public RamCache(int capacity) {
}

@Watched(prefix = "ramcache")
private Object access(Id id) {
private final Object access(Id id) {
assert id != null;

if (this.map.size() <= this.halfCapacity) {
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}
assert id.equals(node.key());
return node.value();
}

final Lock lock = this.keyLock.lock(id);
try {
LinkNode<Id, Object> node = this.map.get(id);
Expand All @@ -91,7 +101,7 @@ private Object access(Id id) {
}

// NOTE: update the queue only if the size > capacity/2
if (this.map.size() > this.capacity >> 1) {
if (this.map.size() > this.halfCapacity) {
// Move the node from mid to tail
if (this.queue.remove(node) == null) {
// The node may be removed by others through dequeue()
Expand All @@ -100,13 +110,6 @@ private Object access(Id id) {
this.queue.enqueue(node);
}

// Ignore concurrent write for hits
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}

assert id.equals(node.key());
return node.value();
} finally {
Expand All @@ -115,7 +118,7 @@ private Object access(Id id) {
}

@Watched(prefix = "ramcache")
private void write(Id id, Object value) {
private final void write(Id id, Object value) {
assert id != null;
assert this.capacity > 0;

Expand Down Expand Up @@ -163,14 +166,13 @@ private void write(Id id, Object value) {

// Add the new item to tail of the queue, then map it
this.map.put(id, this.queue.enqueue(id, value));

} finally {
lock.unlock();
}
}

@Watched(prefix = "ramcache")
private void remove(Id id) {
private final void remove(Id id) {
assert id != null;

final Lock lock = this.keyLock.lock(id);
Expand All @@ -192,16 +194,23 @@ private void remove(Id id) {
@Override
public Object get(Id id) {
Object value = null;
if (this.map.containsKey(id)) {
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);
}

if (value == null) {
++this.miss;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache missed '{}' (miss={}, hits={})",
id, this.miss, this.hits);
}
} else {
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}
}
return value;
}
Expand All @@ -210,10 +219,11 @@ public Object get(Id id) {
@Override
public Object getOrFetch(Id id, Function<Id, Object> fetcher) {
Object value = null;
if (this.map.containsKey(id)) {
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);
}

if (value == null) {
++this.miss;
if (LOG.isDebugEnabled()) {
Expand All @@ -223,6 +233,12 @@ public Object getOrFetch(Id id, Function<Id, Object> fetcher) {
// Do fetch and update the cache
value = fetcher.apply(id);
this.update(id, value);
} else {
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public abstract class IdGenerator {

public abstract Id generate(HugeVertex vertex);

public static Id of(String id) {
public final static Id of(String id) {
return new StringId(id);
}

public static Id of(long id) {
public final static Id of(long id) {
return new LongId(id);
}

public static Id of(byte[] bytes, boolean number) {
public final static Id of(byte[] bytes, boolean number) {
return number ? new LongId(bytes) : new StringId(bytes);
}

Expand All @@ -45,7 +45,7 @@ public static Id of(byte[] bytes, boolean number) {
* @param id original string id value
* @return wrapped id object
*/
public Id generate(String id) {
public final Id generate(String id) {
return of(id);
}

Expand All @@ -54,7 +54,7 @@ public Id generate(String id) {
* @param id original long id value
* @return wrapped id object
*/
public Id generate(long id) {
public final Id generate(long id) {
return of(id);
}

Expand Down
Loading