Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
Change-Id: I761abfa4940ebaa1dd331e9c60c80293bea5d2cb
  • Loading branch information
zhoney committed May 11, 2021
1 parent ac48206 commit 29046da
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

public class KneighborTraverser extends OltpTraverser {

private volatile boolean stop;

public KneighborTraverser(HugeGraph graph) {
super(graph);
}
Expand Down Expand Up @@ -63,7 +65,7 @@ public Set<Id> kneighbor(Id sourceV, Directions dir,
latest = this.adjacentVertices(latest, dir, labelId, all,
degree, remaining);
all.addAll(latest);
if (limit != NO_LIMIT && all.size() >= limit) {
if (reachLimit(limit, all.size())) {
break;
}
}
Expand All @@ -78,61 +80,37 @@ public KneighborRecords customizedKneighbor(Id source, EdgeStep step,
checkPositive(maxDepth, "k-neighbor max_depth");
checkLimit(limit);

boolean single = maxDepth < this.concurrentDepth() ||
step.direction() != Directions.BOTH;
Traverser traverser = new Traverser(source, step, maxDepth, limit,
single);
return traverser.customizedKneighbor();
}

private class Traverser {

private final KneighborRecords record;

private final EdgeStep step;
private final long limit;
private final boolean single;
private int depth;

private boolean stop;

public Traverser(Id source, EdgeStep step, int maxDepth,
long limit, boolean single) {
this.record = new KneighborRecords(source, RecordType.INT,
true, single);
this.step = step;
this.depth = maxDepth;
this.limit = limit;
this.single = single;
this.stop = false;
}
boolean concurrent = maxDepth >= this.concurrentDepth() &&
step.direction() == Directions.BOTH;

public KneighborRecords customizedKneighbor() {
Consumer<Id> consumer = v -> {
Iterator<Edge> edges = edgesOfVertex(v, step);
while (edges.hasNext()) {
Id target = ((HugeEdge) edges.next()).id().otherVertexId();
this.record.addPath(v, target);
this.checkLimit(this.limit, this.depth, this.record.size());
}
};

while (this.depth-- > 0) {
this.record.startOneLayer(true);
traverseIds(this.record.keys(), consumer,
this.single, this.stop);
this.record.finishOneLayer();
}
return this.record;
}
KneighborRecords records = new KneighborRecords(source, RecordType.INT,
true, concurrent);

private void checkLimit(long limit, long depth, int size) {
if (limit == NO_LIMIT || depth > 0) {
Consumer<Id> consumer = v -> {
if (this.stop) {
return;
}
if (size >= limit) {
this.stop = true;
Iterator<Edge> edges = edgesOfVertex(v, step);
while (!this.stop && edges.hasNext()) {
Id target = ((HugeEdge) edges.next()).id().otherVertexId();
records.addPath(v, target);
this.reachLimit(limit, records.size());
}
};

while (maxDepth-- > 0) {
records.startOneLayer(true);
traverseIds(records.keys(), consumer, concurrent);
records.finishOneLayer();
}
return records;
}

private boolean reachLimit(long limit, int size) {
if (limit == NO_LIMIT || size < limit) {
return false;
}
this.stop = true;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@

public class KoutTraverser extends OltpTraverser {

private int depth;
private volatile boolean stop = false;

public KoutTraverser(HugeGraph graph) {
super(graph);
}
Expand All @@ -51,6 +54,7 @@ public Set<Id> kout(Id sourceV, Directions dir, String label,
checkDegree(degree);
checkCapacity(capacity);
checkLimit(limit);
this.depth = depth;
if (capacity != NO_LIMIT) {
// Capacity must > limit because sourceV is counted in capacity
E.checkArgument(capacity >= limit && limit != NO_LIMIT,
Expand All @@ -69,9 +73,9 @@ public Set<Id> kout(Id sourceV, Directions dir, String label,

long remaining = capacity == NO_LIMIT ?
NO_LIMIT : capacity - latest.size();
while (depth-- > 0) {
while (this.depth-- > 0) {
// Just get limit nodes in last layer if limit < remaining capacity
if (depth == 0 && limit != NO_LIMIT &&
if (this.depth == 0 && limit != NO_LIMIT &&
(limit < remaining || remaining == NO_LIMIT)) {
remaining = limit;
}
Expand All @@ -87,10 +91,10 @@ public Set<Id> kout(Id sourceV, Directions dir, String label,
// Update 'remaining' value to record remaining capacity
remaining -= latest.size();

if (remaining <= 0 && depth > 0) {
if (remaining <= 0 && this.depth > 0) {
throw new HugeException(
"Reach capacity '%s' while remaining depth '%s'",
capacity, depth);
capacity, this.depth);
}
}
}
Expand All @@ -106,79 +110,51 @@ public KoutRecords customizedKout(Id source, EdgeStep step,
checkPositive(maxDepth, "k-out max_depth");
checkCapacity(capacity);
checkLimit(limit);
this.depth = maxDepth;
boolean concurrent = maxDepth >= this.concurrentDepth() &&
step.direction() == Directions.BOTH;
KoutRecords records = new KoutRecords(source, RecordType.INT,
nearest, concurrent);

Consumer<Id> consumer = v -> {
if (this.stop) {
return;
}
Iterator<Edge> edges = edgesOfVertex(v, step);
while (!this.stop && edges.hasNext()) {
Id target = ((HugeEdge) edges.next()).id().otherVertexId();
records.addPath(v, target);

boolean single = maxDepth < this.concurrentDepth() ||
step.direction() != Directions.BOTH;
Traverser traverser = new Traverser(source, step, maxDepth, nearest,
capacity, limit, single);
return traverser.customizedKout();
}
this.checkCapacity(capacity, records.accessed(), this.depth);
this.checkLimit(limit, this.depth, records.size());
}
};

private class Traverser {

private final KoutRecords record;

private final EdgeStep step;
private final long capacity;
private final long limit;
private final boolean single;
private int depth;

private boolean stop;

public Traverser(Id source, EdgeStep step, int maxDepth,
boolean nearest, long capacity, long limit,
boolean single) {
this.record = new KoutRecords(source, RecordType.INT,
nearest, single);
this.step = step;
this.depth = maxDepth;
this.capacity = capacity;
this.limit = limit;
this.single = single;
this.stop = false;
while (this.depth-- > 0) {
records.startOneLayer(true);
traverseIds(records.keys(), consumer, concurrent);
records.finishOneLayer();
}
return records;
}

public KoutRecords customizedKout() {
Consumer<Id> consumer = v -> {
Iterator<Edge> edges = edgesOfVertex(v, step);
while (edges.hasNext()) {
Id target = ((HugeEdge) edges.next()).id().otherVertexId();
this.record.addPath(v, target);

this.checkCapacity(this.capacity, this.record.accessed(),
this.depth);
this.checkLimit(this.limit, this.depth, this.record.size());
}
};

while (this.depth-- > 0) {
this.record.startOneLayer(true);
traverseIds(this.record.keys(), consumer,
this.single, this.stop);
this.record.finishOneLayer();
}
return this.record;
private void checkCapacity(long capacity, long accessed, int depth) {
if (capacity == NO_LIMIT) {
return;
}

private void checkCapacity(long capacity, long accessed, int depth) {
if (capacity == NO_LIMIT) {
return;
}
if (capacity <= accessed && depth > 0) {
throw new HugeException(
"Reach capacity '%s' while remaining depth '%s'",
capacity, depth);
}
if (accessed >= capacity && depth > 0) {
throw new HugeException(
"Reach capacity '%s' while remaining depth '%s'",
capacity, depth);
}
}

private void checkLimit(long limit, long depth, int size) {
if (limit == NO_LIMIT || depth > 0) {
return;
}
if (size >= limit) {
this.stop = true;
}
private void checkLimit(long limit, long depth, int size) {
if (limit == NO_LIMIT || depth > 0) {
return;
}
if (size >= limit) {
this.stop = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,17 @@ protected Set<Node> adjacentVertices(Set<Node> vertices, EdgeStep step,

protected long traverseNodes(Iterator<Node> vertices,
Consumer<Node> consumer) {
return this.traverse(vertices, consumer, "traverse-nodes", false);
return this.traverse(vertices, consumer, "traverse-nodes");
}

protected long traversePairs(Iterator<Pair<Id, Id>> pairs,
Consumer<Pair<Id, Id>> consumer) {
return this.traverse(pairs, consumer, "traverse-pairs", false);
return this.traverse(pairs, consumer, "traverse-pairs");
}

protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer,
boolean single, boolean stop) {
if (!single) {
boolean concurrent) {
if (concurrent) {
return this.traverseIds(ids, consumer);
} else {
long count = 0L;
Expand All @@ -158,17 +158,12 @@ protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer,
}

protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer) {
return this.traverseIds(ids, consumer, false);
}

protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer,
boolean stop) {
return this.traverse(ids, consumer, "traverse-ids", stop);
return this.traverse(ids, consumer, "traverse-ids");
}

protected <K> long traverse(Iterator<K> iterator, Consumer<K> consumer,
String name, boolean stop) {
if (!iterator.hasNext() || stop) {
String name) {
if (!iterator.hasNext()) {
return 0L;
}

Expand All @@ -177,7 +172,7 @@ protected <K> long traverse(Iterator<K> iterator, Consumer<K> consumer,
consumers.start(name);
long total = 0L;
try {
while (!stop && iterator.hasNext()) {
while (iterator.hasNext()) {
total++;
K v = iterator.next();
consumers.provide(v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,25 @@
import com.baidu.hugegraph.traversal.algorithm.records.record.Record;
import com.baidu.hugegraph.traversal.algorithm.records.record.RecordFactory;
import com.baidu.hugegraph.traversal.algorithm.records.record.RecordType;
import com.baidu.hugegraph.util.collection.ObjectIntMapping;
import com.baidu.hugegraph.util.collection.mapping.ObjectIntMapping;
import com.baidu.hugegraph.util.collection.mapping.MappingFactory;

public abstract class AbstractRecords implements Records {

private final ObjectIntMapping<Id> idMapping;
protected final RecordType type;
private final boolean single;
private final RecordType type;
private final boolean concurrent;

public AbstractRecords(RecordType type, boolean single) {
this.idMapping = new ObjectIntMapping<>();
public AbstractRecords(RecordType type, boolean concurrent) {
this.type = type;
this.single = single;
this.concurrent = concurrent;
this.idMapping = MappingFactory.newObjectIntMapping(this.concurrent);
}

public AbstractRecords(RecordType type) {
this.idMapping = new ObjectIntMapping<>();
this.type = type;
this.single = true;
this.concurrent = false;
this.idMapping = MappingFactory.newObjectIntMapping();
}

@Watched
Expand All @@ -55,7 +56,6 @@ protected Id id(int code) {
}

protected Record newRecord() {
return RecordFactory.newRecord(this.type, single);
return RecordFactory.newRecord(this.type, this.concurrent);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@

public class KneighborRecords extends SingleWayMultiPathsRecords {

private Id source;
private final Id source;

public KneighborRecords(Id source, RecordType type,
boolean nearest, boolean single) {
super(source, type, nearest, single);
boolean nearest, boolean concurrent) {
super(source, type, nearest, concurrent);
this.source = source;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class KoutRecords extends SingleWayMultiPathsRecords {

public KoutRecords(Id source,
RecordType type,
boolean nearest, boolean single) {
super(source, type, nearest, single);
boolean nearest, boolean concurrent) {
super(source, type, nearest, concurrent);
}

@Override
Expand Down
Loading

0 comments on commit 29046da

Please sign in to comment.