Skip to content

Commit

Permalink
multiple threads adapt for kout/kneighbor
Browse files Browse the repository at this point in the history
Change-Id: I0791e3d307f2cff51ccc4760ea63c52468a4b8d8
  • Loading branch information
zhoney committed May 10, 2021
1 parent 9d5439f commit 5b56077
Show file tree
Hide file tree
Showing 23 changed files with 763 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import static com.baidu.hugegraph.traversal.algorithm.HugeTraverser.DEFAULT_DEGREE;
import static com.baidu.hugegraph.traversal.algorithm.HugeTraverser.DEFAULT_ELEMENTS_LIMIT;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import javax.inject.Singleton;
Expand All @@ -50,8 +48,10 @@
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.traversal.algorithm.records.KneighborRecords;
import com.baidu.hugegraph.traversal.algorithm.steps.EdgeStep;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser.PathSet;
import com.baidu.hugegraph.traversal.algorithm.KneighborTraverser;
import com.baidu.hugegraph.type.define.Directions;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -125,32 +125,27 @@ public String post(@Context GraphManager manager,

EdgeStep step = step(g, request.step);

Set<HugeTraverser.Node> results;
KneighborRecords results;
try (KneighborTraverser traverser = new KneighborTraverser(g)) {
results = traverser.customizedKneighbor(sourceId, step,
request.maxDepth,
request.limit);
}

Set<Id> neighbors = new HashSet<>();
for (HugeTraverser.Node node : results) {
neighbors.add(node.id());
}
Set<Id> neighbors = results.ids(request.limit);

List<HugeTraverser.Path> paths = new ArrayList<>();
PathSet paths = new PathSet();
if (request.withPath) {
for (HugeTraverser.Node node : results) {
paths.add(new HugeTraverser.Path(node.path()));
}
paths.addAll(results.paths(request.limit));
}
Iterator<Vertex> iter = QueryResults.emptyIterator();
if (request.withVertex) {
Set<Id> ids = new HashSet<>();
for (HugeTraverser.Node node : results) {
ids.add(node.id());
}
for (HugeTraverser.Path p : paths) {
ids.addAll(p.vertices());
ids.addAll(results.ids(request.limit));
if (request.withPath) {
for (HugeTraverser.Path p : paths) {
ids.addAll(p.vertices());
}
}
if (!ids.isEmpty()) {
iter = g.vertices(ids.toArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import static com.baidu.hugegraph.traversal.algorithm.HugeTraverser.DEFAULT_DEGREE;
import static com.baidu.hugegraph.traversal.algorithm.HugeTraverser.DEFAULT_ELEMENTS_LIMIT;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import javax.inject.Singleton;
Expand All @@ -51,9 +49,9 @@
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.traversal.algorithm.records.KoutRecords;
import com.baidu.hugegraph.traversal.algorithm.steps.EdgeStep;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser;
import com.baidu.hugegraph.traversal.algorithm.HugeTraverser.Node;
import com.baidu.hugegraph.traversal.algorithm.KoutTraverser;
import com.baidu.hugegraph.type.define.Directions;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -133,7 +131,7 @@ public String post(@Context GraphManager manager,

EdgeStep step = step(g, request.step);

Set<HugeTraverser.Node> results;
KoutRecords results;
try (KoutTraverser traverser = new KoutTraverser(g)) {
results = traverser.customizedKout(sourceId, step,
request.maxDepth,
Expand All @@ -142,25 +140,20 @@ public String post(@Context GraphManager manager,
request.limit);
}

Set<Id> neighbors = new HashSet<>();
for (HugeTraverser.Node node : results) {
neighbors.add(node.id());
}
Set<Id> neighbors = results.ids(request.limit);

List<HugeTraverser.Path> paths = new ArrayList<>();
HugeTraverser.PathSet paths = new HugeTraverser.PathSet();
if (request.withPath) {
for (HugeTraverser.Node node : results) {
paths.add(new HugeTraverser.Path(node.path()));
}
paths.addAll(results.paths(request.limit));
}
Iterator<Vertex> iter = QueryResults.emptyIterator();
if (request.withVertex) {
Set<Id> ids = new HashSet<>();
for (Node node : results) {
ids.add(node.id());
}
for (HugeTraverser.Path p : paths) {
ids.addAll(p.vertices());
ids.addAll(results.ids(request.limit));
if (request.withPath) {
for (HugeTraverser.Path p : paths) {
ids.addAll(p.vertices());
}
}
if (!ids.isEmpty()) {
iter = g.vertices(ids.toArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,10 @@ public static HugeEdge constructEdge(HugeVertex ownerVertex,

VertexLabel otherVertexLabel;
if (isOutEdge) {
// ownerVertex.correctVertexLabel(srcLabel);
ownerVertex.correctVertexLabel(srcLabel);
otherVertexLabel = tgtLabel;
} else {
// ownerVertex.correctVertexLabel(tgtLabel);
ownerVertex.correctVertexLabel(tgtLabel);
otherVertexLabel = srcLabel;
}
HugeVertex otherVertex = new HugeVertex(graph, otherVertexId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.query.QueryResults;
import com.baidu.hugegraph.backend.tx.GraphTransaction;
import com.baidu.hugegraph.backend.tx.GraphTransaction.LimitIterator;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.exception.NotFoundException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
Expand All @@ -60,7 +61,6 @@
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.collection.CollectionFactory;
import com.baidu.hugegraph.util.collection.ObjectIntMapping;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -152,26 +152,6 @@ protected Set<Id> adjacentVertices(Id source, EdgeStep step) {
return neighbors;
}

protected Set<Node> adjacentVertices(Set<Node> vertices, EdgeStep step,
Set<Node> excluded, long remaining) {
Set<Node> neighbors = newSet();
for (Node source : vertices) {
Iterator<Edge> edges = this.edgesOfVertex(source.id(), step);
while (edges.hasNext()) {
Id target = ((HugeEdge) edges.next()).id().otherVertexId();
KNode kNode = new KNode(target, (KNode) source);
if (excluded != null && excluded.contains(kNode)) {
continue;
}
neighbors.add(kNode);
if (remaining != NO_LIMIT && --remaining <= 0L) {
return neighbors;
}
}
}
return neighbors;
}

@Watched
protected Iterator<Edge> edgesOfVertex(Id source, Directions dir,
Id label, long limit) {
Expand All @@ -190,16 +170,26 @@ protected Iterator<Edge> edgesOfVertex(Id source, Directions dir,
@Watched
protected Iterator<Edge> edgesOfVertex(Id source, Directions dir,
Map<Id, String> labels, long limit) {
Iterator<Edge> result;
if (labels == null || labels.isEmpty()) {
return this.edgesOfVertex(source, dir, (Id) null, limit);
}
ExtendableIterator<Edge> results = new ExtendableIterator<>();
ExtendableIterator<Edge> iterators = new ExtendableIterator<>();
for (Id label : labels.keySet()) {
E.checkNotNull(label, "edge label");
// TODO: limit should be applied to all labels
results.extend(this.edgesOfVertex(source, dir, label, limit));
iterators.extend(this.edgesOfVertex(source, dir, label, limit));
}
return results;

result = iterators;
if (limit != NO_LIMIT) {
Query query = new Query(HugeType.EDGE);
query.limit(limit);
result = new LimitIterator<>(iterators, e -> {
long count = query.goOffset(1L);
return query.reachLimit(count - 1L);
});
}
return result;
}

protected Iterator<Edge> edgesOfVertex(Id source, EdgeStep edgeStep) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@

import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;

import org.apache.tinkerpop.gremlin.structure.Edge;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.structure.HugeEdge;
import com.baidu.hugegraph.traversal.algorithm.records.KneighborRecords;
import com.baidu.hugegraph.traversal.algorithm.records.record.RecordType;
import com.baidu.hugegraph.traversal.algorithm.steps.EdgeStep;
import com.baidu.hugegraph.type.define.Directions;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -65,46 +71,68 @@ public Set<Id> kneighbor(Id sourceV, Directions dir,
return all;
}

public Set<Node> customizedKneighbor(Id source, EdgeStep step,
int maxDepth, long limit) {
public KneighborRecords customizedKneighbor(Id source, EdgeStep step,
int maxDepth, long limit) {
E.checkNotNull(source, "source vertex id");
this.checkVertexExist(source, "source vertex");
checkPositive(maxDepth, "k-neighbor max_depth");
checkLimit(limit);

boolean single = maxDepth < this.concurrentDepth() ||
step.direction() != Directions.BOTH;
return this.customizedKneighbor(source, step, maxDepth,
limit, single);
Traverser traverser = new Traverser(source, step, maxDepth, limit,
single);
return traverser.customizedKneighbor();
}

public Set<Node> customizedKneighbor(Id source, EdgeStep step, int maxDepth,
long limit, boolean single) {
Set<Node> latest = newSet(single);
Set<Node> all = newSet(single);
private class Traverser {

Node sourceV = new KNode(source, null);
private final KneighborRecords record;

latest.add(sourceV);
all.add(sourceV);
private final EdgeStep step;
private final long limit;
private final boolean single;
private int depth;

while (maxDepth-- > 0) {
long remaining = limit == NO_LIMIT ? NO_LIMIT : limit - all.size();
latest = this.adjacentVertices(latest, step, all,
remaining, single);
int size = all.size() + latest.size();
if (limit != NO_LIMIT && size >= limit) {
int subLength = (int) limit - all.size();
Iterator<Node> iterator = latest.iterator();
for (int i = 0; i < subLength && iterator.hasNext(); i++) {
all.add(iterator.next());
private boolean stop;

public Traverser(Id source, EdgeStep step, int maxDepth,
long limit, boolean single) {
this.record = new KneighborRecords(source, RecordType.INT,
true, single);
this.step = step;
this.depth = maxDepth;
this.limit = limit;
this.single = single;
this.stop = false;
}

public KneighborRecords customizedKneighbor() {
Consumer<Id> consumer = v -> {
Iterator<Edge> edges = edgesOfVertex(v, step);
while (edges.hasNext()) {
Id target = ((HugeEdge) edges.next()).id().otherVertexId();
this.record.addPath(v, target);
this.checkLimit(this.limit, this.depth, this.record.size());
}
break;
} else {
all.addAll(latest);
};

while (this.depth-- > 0) {
this.record.startOneLayer(true);
traverseIds(this.record.keys(), consumer,
this.single, this.stop);
this.record.finishOneLayer();
}
return this.record;
}

return all;
private void checkLimit(long limit, long depth, int size) {
if (limit == NO_LIMIT || depth > 0) {
return;
}
if (size >= limit) {
this.stop = true;
}
}
}
}
Loading

0 comments on commit 5b56077

Please sign in to comment.