Skip to content

Commit

Permalink
prepare for mapdb backend
Browse files Browse the repository at this point in the history
Change-Id: I046bde4a32b3fd6693f788d27a9165e2646bc5cb
  • Loading branch information
javeme committed Feb 3, 2019
1 parent 1885132 commit db8b560
Show file tree
Hide file tree
Showing 41 changed files with 476 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.baidu.hugegraph.backend.cache.Cache;
import com.baidu.hugegraph.backend.cache.CacheManager;
import com.baidu.hugegraph.backend.store.BackendStoreSystemInfo;
import com.baidu.hugegraph.backend.store.memory.InMemoryDBStoreProvider;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.exception.NotSupportException;
Expand Down Expand Up @@ -189,7 +188,9 @@ private void loadGraph(String name, String path) {
private void checkBackendVersionOrExit() {
for (String graph : this.graphs()) {
HugeGraph hugegraph = this.graph(graph);
if (InMemoryDBStoreProvider.matchType(hugegraph.backend())) {
boolean persistence = hugegraph.graphTransaction().store()
.features().supportsPersistence();
if (!persistence) {
hugegraph.initBackend();
}
BackendStoreSystemInfo info = new BackendStoreSystemInfo(hugegraph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ public final synchronized Cluster cluster() {
}

@Override
public final synchronized Session session() {
public final Session session() {
return (Session) super.getOrNewSession();
}

@Override
protected final synchronized Session newSession() {
protected Session newSession() {
E.checkState(this.cluster != null,
"Cassandra cluster has not been initialized");
return new Session();
Expand Down Expand Up @@ -157,7 +157,7 @@ public BatchStatement add(Statement statement) {
}

@Override
public void clear() {
public void rollback() {
this.batch.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void rollbackTx() {

session.txState(TxState.ROLLBACKING);
try {
session.clear();
session.rollback();
} finally {
// Assume batch commit would auto rollback
session.txState(TxState.CLEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.baidu.hugegraph.type.HugeType;
import com.google.common.collect.ImmutableList;

public class CachedGraphTransaction extends GraphTransaction {
public final class CachedGraphTransaction extends GraphTransaction {

private final static int MAX_CACHE_EDGES_PER_QUERY = 100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
Expand All @@ -40,7 +39,7 @@
import com.baidu.hugegraph.util.Events;
import com.google.common.collect.ImmutableSet;

public class CachedSchemaTransaction extends SchemaTransaction {
public final class CachedSchemaTransaction extends SchemaTransaction {

private final Cache idCache;
private final Cache nameCache;
Expand Down Expand Up @@ -153,12 +152,26 @@ private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
}

private Object getOrFetch(HugeType type, Id id,
Function<Id, Object> fetcher) {
@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.resetCachedAllIfReachedCapacity();

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value == null) {
value = fetcher.apply(id);
value = super.getSchema(type, id);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

Expand All @@ -169,15 +182,17 @@ private Object getOrFetch(HugeType type, Id id,
this.nameCache.update(prefixedName, schema);
}
}
return value;
return (T) value;
}

private Object getOrFetch(HugeType type, String name,
Function<String, Object> fetcher) {
@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type,
String name) {
Id prefixedName = generateId(type, name);
Object value = this.nameCache.get(prefixedName);
if (value == null) {
value = fetcher.apply(name);
value = super.getSchema(type, name);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

Expand All @@ -188,36 +203,6 @@ private Object getOrFetch(HugeType type, String name,
this.idCache.update(prefixedId, schema);
}
}
return value;
}

@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);

this.resetCachedAllIfReachedCapacity();

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
Object value = this.getOrFetch(type, id,
k -> super.getSchema(type, id));
return (T) value;
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type,
String name) {
Object value = this.getOrFetch(type, name,
k -> super.getSchema(type, name));
return (T) value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class RamCache implements Cache {

// NOTE: the count in number of items, not in bytes
private final int capacity;
private final int halfCapacity;

// Implement LRU cache
private final ConcurrentMap<Id, LinkNode<Id, Object>> map;
Expand All @@ -63,14 +64,14 @@ public RamCache() {
}

public RamCache(int capacity) {
this.keyLock = new KeyLock();

if (capacity < 1) {
capacity = 1;
if (capacity < 0) {
capacity = 0;
}
this.keyLock = new KeyLock();
this.capacity = capacity;
this.halfCapacity = this.capacity >> 1;

int initialCapacity = capacity >> 3;
int initialCapacity = capacity >= MB ? capacity >> 10 : 256;
if (initialCapacity > MAX_INIT_CAP) {
initialCapacity = MAX_INIT_CAP;
}
Expand All @@ -80,9 +81,18 @@ public RamCache(int capacity) {
}

@Watched(prefix = "ramcache")
private Object access(Id id) {
private final Object access(Id id) {
assert id != null;

if (this.map.size() <= this.halfCapacity) {
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}
assert id.equals(node.key());
return node.value();
}

final Lock lock = this.keyLock.lock(id);
try {
LinkNode<Id, Object> node = this.map.get(id);
Expand All @@ -91,7 +101,7 @@ private Object access(Id id) {
}

// NOTE: update the queue only if the size > capacity/2
if (this.map.size() > this.capacity >> 1) {
if (this.map.size() > this.halfCapacity) {
// Move the node from mid to tail
if (this.queue.remove(node) == null) {
// The node may be removed by others through dequeue()
Expand All @@ -100,13 +110,6 @@ private Object access(Id id) {
this.queue.enqueue(node);
}

// Ignore concurrent write for hits
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}

assert id.equals(node.key());
return node.value();
} finally {
Expand All @@ -115,7 +118,7 @@ private Object access(Id id) {
}

@Watched(prefix = "ramcache")
private void write(Id id, Object value) {
private final void write(Id id, Object value) {
assert id != null;
assert this.capacity > 0;

Expand Down Expand Up @@ -163,14 +166,13 @@ private void write(Id id, Object value) {

// Add the new item to tail of the queue, then map it
this.map.put(id, this.queue.enqueue(id, value));

} finally {
lock.unlock();
}
}

@Watched(prefix = "ramcache")
private void remove(Id id) {
private final void remove(Id id) {
assert id != null;

final Lock lock = this.keyLock.lock(id);
Expand All @@ -192,16 +194,23 @@ private void remove(Id id) {
@Override
public Object get(Id id) {
Object value = null;
if (this.map.containsKey(id)) {
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);
}

if (value == null) {
++this.miss;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache missed '{}' (miss={}, hits={})",
id, this.miss, this.hits);
}
} else {
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}
}
return value;
}
Expand All @@ -210,10 +219,11 @@ public Object get(Id id) {
@Override
public Object getOrFetch(Id id, Function<Id, Object> fetcher) {
Object value = null;
if (this.map.containsKey(id)) {
if (this.map.size() <= this.halfCapacity || this.map.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);
}

if (value == null) {
++this.miss;
if (LOG.isDebugEnabled()) {
Expand All @@ -223,6 +233,12 @@ public Object getOrFetch(Id id, Function<Id, Object> fetcher) {
// Do fetch and update the cache
value = fetcher.apply(id);
this.update(id, value);
} else {
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public abstract class IdGenerator {

public abstract Id generate(HugeVertex vertex);

public static Id of(String id) {
public final static Id of(String id) {
return new StringId(id);
}

public static Id of(long id) {
public final static Id of(long id) {
return new LongId(id);
}

public static Id of(byte[] bytes, boolean number) {
public final static Id of(byte[] bytes, boolean number) {
return number ? new LongId(bytes) : new StringId(bytes);
}

Expand All @@ -45,7 +45,7 @@ public static Id of(byte[] bytes, boolean number) {
* @param id original string id value
* @return wrapped id object
*/
public Id generate(String id) {
public final Id generate(String id) {
return of(id);
}

Expand All @@ -54,7 +54,7 @@ public Id generate(String id) {
* @param id original long id value
* @return wrapped id object
*/
public Id generate(long id) {
public final Id generate(long id) {
return of(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,6 @@ public boolean test(HugeElement element) {
public Condition copy() {
return new And(this.left().copy(), this.right().copy());
}

@Override
public boolean isFlattened() {
// If this is flattened, its sub-condition should not be nested
return this.left().isRelation() && this.right().isRelation();
}
}

public static class Or extends BinCondition {
Expand Down Expand Up @@ -442,6 +436,10 @@ public abstract static class Relation extends Condition {
// The value serialized(code/string) by backend store.
protected Object serialValue;

protected Set<RelationType> UNFLATTEN_RELATION_TYPES = ImmutableSet.of(
RelationType.IN, RelationType.NOT_IN,
RelationType.TEXT_CONTAINS_ANY);

@Override
public ConditionType type() {
return ConditionType.RELATION;
Expand Down Expand Up @@ -476,6 +474,11 @@ public boolean test(Object value) {
return this.relation.test(value, this.value);
}

@Override
public boolean isFlattened() {
return !this.UNFLATTEN_RELATION_TYPES.contains(this.relation);
}

@Override
public List<? extends Relation> relations() {
return ImmutableList.of(this);
Expand Down
Loading

0 comments on commit db8b560

Please sign in to comment.