Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve tx-cache and index&ids query #105

Merged
merged 1 commit into from
Nov 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public interface Cache {

public void update(Id id, Object value);

public void updateIfAbsent(Id id, Object value) ;
public void updateIfAbsent(Id id, Object value);

public void updateIfPresent(Id id, Object value);

public void invalidate(Id id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,24 @@
package com.baidu.hugegraph.backend.cache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.cache.CachedBackendStore.QueryId;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.tx.GraphTransaction;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.structure.HugeEdge;
import com.baidu.hugegraph.structure.HugeEdgeProperty;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.structure.HugeVertexProperty;
import com.baidu.hugegraph.type.HugeType;
import com.google.common.collect.ImmutableList;

public class CachedGraphTransaction extends GraphTransaction {
Expand Down Expand Up @@ -72,146 +69,97 @@ private Cache cache(String prefix, int capacity, long expire) {
}

@Override
public Iterator<Vertex> queryVertices(Object... vertexIds) {
List<Vertex> vertices = new ArrayList<>(vertexIds.length);
for (Object vertexId : vertexIds) {
if (vertexId == null) {
continue;
}
Id vid = HugeVertex.getIdValue(vertexId);
Object v = this.verticesCache.getOrFetch(vid, id -> {
Iterator<Vertex> iterator = super.queryVertices(id);
return iterator.hasNext() ? iterator.next() : null;
});
if (v != null) {
vertices.add((Vertex) v);
}
protected Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
if (!query.ids().isEmpty() && query.conditions().isEmpty()) {
return this.queryVerticesByIds((IdQuery) query);
} else {
return super.queryVerticesFromBackend(query);
}
return vertices.iterator();
}

@Override
public Iterator<Vertex> queryVertices(Query query) {
if (!query.ids().isEmpty() && query.conditions().isEmpty()) {
return this.queryVertices(query.ids().toArray());
} else {
return super.queryVertices(query);
private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
IdQuery newQuery = new IdQuery(HugeType.VERTEX, query);
List<HugeVertex> vertices = new ArrayList<>(query.ids().size());
for (Id vertexId : query.ids()) {
Object vertex = this.verticesCache.get(vertexId);
if (vertex != null) {
vertices.add((HugeVertex) vertex);
} else {
newQuery.query(vertexId);
}
}
if (vertices.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems no need to set 'newQuery = query'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes both are ok

// Just use the origin query if find none from the cache
newQuery = query;
}
if (!newQuery.empty()) {
Iterator<HugeVertex> rs = super.queryVerticesFromBackend(newQuery);
while (rs.hasNext()) {
HugeVertex vertex = rs.next();
vertices.add(vertex);
this.verticesCache.update(vertex.id(), vertex);
}
}
return vertices.iterator();
}

@Override
public Iterator<Edge> queryEdges(Query query) {
protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
if (query.empty()) {
// Query all edges, don't cache it
return super.queryEdges(query);
return super.queryEdgesFromBackend(query);
}

Id id = new QueryId(query);
@SuppressWarnings("unchecked")
List<Edge> edges = (List<Edge>) this.edgesCache.get(id);
List<HugeEdge> edges = (List<HugeEdge>) this.edgesCache.get(id);
if (edges == null) {
// Iterator can't be cached, caching list instead
edges = ImmutableList.copyOf(super.queryEdges(query));
edges = ImmutableList.copyOf(super.queryEdgesFromBackend(query));
if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) {
this.edgesCache.update(id, edges);
}
}
return edges.iterator();
}

@Override
public HugeVertex addVertex(HugeVertex vertex) {
// Update vertex cache
this.verticesCache.invalidate(vertex.id());

return super.addVertex(vertex);
}

@Override
public void removeVertex(HugeVertex vertex) {
// Update vertex cache
this.verticesCache.invalidate(vertex.id());

// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();

super.removeVertex(vertex);
}

@Override
public <V> void addVertexProperty(HugeVertexProperty<V> prop) {
// Update vertex cache
this.verticesCache.invalidate(prop.element().id());

super.addVertexProperty(prop);
}

@Override
public <V> void removeVertexProperty(HugeVertexProperty<V> prop) {
// Update vertex cache
this.verticesCache.invalidate(prop.element().id());

super.removeVertexProperty(prop);
}

@Override
public HugeEdge addEdge(HugeEdge edge) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();

return super.addEdge(edge);
}

@Override
public void removeEdge(HugeEdge edge) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();

super.removeEdge(edge);
}

@Override
public void removeEdges(EdgeLabel edgeLabel) {
super.removeEdges(edgeLabel);

// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
}

@Override
public <V> void addEdgeProperty(HugeEdgeProperty<V> prop) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();

super.addEdgeProperty(prop);
}

@Override
public <V> void removeEdgeProperty(HugeEdgeProperty<V> prop) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
public void commit() throws BackendException {
// Collect changes before commit
Collection<HugeVertex> changes = this.verticesInTxUpdated();
Collection<HugeVertex> deletions = this.verticesInTxRemoved();
int edgesInTxSize = this.edgesInTxSize();

try {
super.commit();
} finally {
// Update vertex cache
for (HugeVertex vertex : changes) {
this.verticesCache.updateIfPresent(vertex.id(), vertex);
}
for (HugeVertex vertex : deletions) {
this.verticesCache.invalidate(vertex.id());
}

super.removeEdgeProperty(prop);
// Update edge cache if any edges change
if (edgesInTxSize > 0) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
}
}
}

@Override
public void removeIndex(IndexLabel indexLabel) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();

super.removeIndex(indexLabel);
}

@Override
public void rollback() throws BackendException {
// Update vertex cache
for (Id id : this.verticesInTx()) {
this.verticesCache.invalidate(id);
try {
super.removeIndex(indexLabel);
} finally {
// Update edge cache if needed (any edge-index is deleted)
if (indexLabel.baseType() == HugeType.EDGE_LABEL) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
}
}

// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();

super.rollback();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ public void updateIfAbsent(Id id, Object value) {
this.write(id, value);
}

@Watched(prefix = "ramcache")
@Override
public void updateIfPresent(Id id, Object value) {
if (id == null || value == null ||
this.capacity <= 0 || !this.map.containsKey(id)) {
return;
}
this.write(id, value);
}

@Watched(prefix = "ramcache")
@Override
public void invalidate(Id id) {
Expand All @@ -259,9 +269,8 @@ public void invalidate(Id id) {
@Override
public void traverse(Consumer<Object> consumer) {
E.checkNotNull(consumer, "consumer");
for (LinkNode<Id, Object> node : this.map.values()) {
consumer.accept(node.value());
}
// NOTE: forEach is 20% faster than for-in with ConcurrentHashMap
this.map.values().forEach(node -> consumer.accept(node.value()));
}

@Watched(prefix = "ramcache")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void commit() throws BackendException {
return;
}

// Do rate limit if need
// Do rate limit if needed
RateLimiter rateLimiter = this.graph.rateLimiter();
if (rateLimiter != null) {
int size = this.mutationSize();
Expand All @@ -170,6 +170,7 @@ public void commit() throws BackendException {
}
}

@Override
public void commitIfGtSize(int size) throws BackendException {
if (this.mutationSize() >= size) {
this.commit();
Expand Down
Loading