Skip to content

Commit

Permalink
feat(core): support batch+parallel edges traverse (#2312)
Browse files Browse the repository at this point in the history
## Main Changes

- Enhance Consumers.java, supporting ExceptionHandle and `Future` to handle InterruptedException when awaiting
- Add Nested Iterator Edge and support batch execution
- Support batch execution & thread parallel in KoutTraverser and Kneighbor
  • Loading branch information
DanGuge authored Oct 24, 2023
1 parent b43526d commit 8db0a9b
Show file tree
Hide file tree
Showing 10 changed files with 516 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.query;

import java.util.Iterator;
import java.util.List;

import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.type.define.Directions;

public class EdgesQueryIterator implements Iterator<Query> {

private final List<Id> labels;
private final Directions directions;
private final long limit;
private final Iterator<Id> sources;

public EdgesQueryIterator(Iterator<Id> sources,
Directions directions,
List<Id> labels,
long limit) {
this.sources = sources;
this.labels = labels;
this.directions = directions;
// Traverse NO_LIMIT 和 Query.NO_LIMIT 不同
this.limit = limit < 0 ? Query.NO_LIMIT : limit;
}

@Override
public boolean hasNext() {
return sources.hasNext();
}

@Override
public Query next() {
Id sourceId = this.sources.next();
ConditionQuery query = GraphTransaction.constructEdgesQuery(sourceId,
this.directions,
this.labels);
if (this.limit != Query.NO_LIMIT) {
query.limit(this.limit);
query.capacity(this.limit);
} else {
query.capacity(Query.NO_CAPACITY);
}
return query;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;
import org.apache.hugegraph.type.define.NodeRole;
import org.apache.hugegraph.util.*;
import org.apache.hugegraph.util.Consumers;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.LockUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;

public final class TaskManager {

private static final Logger LOG = Log.logger(TaskManager.class);
Expand All @@ -48,7 +49,7 @@ public final class TaskManager {
public static final String TASK_SCHEDULER = "task-scheduler-%d";

protected static final long SCHEDULE_PERIOD = 1000L; // unit ms

private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
private static final int THREADS = 4;
private static final TaskManager MANAGER = new TaskManager(THREADS);

Expand Down Expand Up @@ -134,7 +135,7 @@ private void closeTaskTx(HugeGraphParams graph) {
graph.closeTx();
} else {
Consumers.executeOncePerThread(this.taskExecutor, totalThreads,
graph::closeTx);
graph::closeTx, TX_CLOSE_TIMEOUT);
}
} catch (Exception e) {
throw new HugeException("Exception when closing task tx", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hugegraph.traversal.algorithm;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -37,6 +39,7 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.Aggregate;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.EdgesQueryIterator;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.tx.GraphTransaction;
Expand Down Expand Up @@ -66,6 +69,7 @@
import org.apache.hugegraph.util.collection.ObjectIntMapping;
import org.apache.hugegraph.util.collection.ObjectIntMappingFactory;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.slf4j.Logger;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -465,6 +469,13 @@ private Iterator<Edge> edgesOfVertex(Id source, EdgeStep edgeStep,
return edgeStep.skipSuperNodeIfNeeded(edges);
}

public EdgesIterator edgesOfVertices(Iterator<Id> sources,
Directions dir,
List<Id> labelIds,
long degree) {
return new EdgesIterator(new EdgesQueryIterator(sources, dir, labelIds, degree));
}

public Iterator<Edge> edgesOfVertex(Id source, Steps steps) {
List<Id> edgeLabels = steps.edgeLabels();
ConditionQuery cq = GraphTransaction.constructEdgesQuery(
Expand All @@ -474,6 +485,11 @@ public Iterator<Edge> edgesOfVertex(Id source, Steps steps) {
cq.limit(steps.limit());
}

if (steps.isEdgeEmpty()) {
Iterator<Edge> edges = this.graph().edges(cq);
return edgesOfVertexStep(edges, steps);
}

Map<Id, ConditionQuery> edgeConditions =
getFilterQueryConditions(steps.edgeSteps(), HugeType.EDGE);

Expand Down Expand Up @@ -1004,4 +1020,33 @@ public Set<Edge> getEdges(Iterator<Id> vertexIter) {
return edges;
}
}

public class EdgesIterator implements Iterator<Iterator<Edge>>, Closeable {

private final Iterator<Iterator<Edge>> currentIter;

public EdgesIterator(EdgesQueryIterator queries) {
List<Iterator<Edge>> iteratorList = new ArrayList<>();
while (queries.hasNext()) {
Iterator<Edge> edges = graph.edges(queries.next());
iteratorList.add(edges);
}
this.currentIter = iteratorList.iterator();
}

@Override
public boolean hasNext() {
return this.currentIter.hasNext();
}

@Override
public Iterator<Edge> next() {
return this.currentIter.next();
}

@Override
public void close() throws IOException {
CloseableIterator.closeIterator(currentIter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.hugegraph.traversal.algorithm;

import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;

import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.traversal.algorithm.records.KneighborRecords;
Expand All @@ -48,25 +48,27 @@ public Set<Id> kneighbor(Id sourceV, Directions dir,

Id labelId = this.getEdgeLabelId(label);

Set<Id> latest = newSet();
Set<Id> all = newSet();
KneighborRecords records = new KneighborRecords(true, sourceV, true);

latest.add(sourceV);
this.vertexIterCounter.addAndGet(1L);
Consumer<EdgeId> consumer = edgeId -> {
if (this.reachLimit(limit, records.size())) {
return;
}
records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
};

while (depth-- > 0) {
long remaining = limit == NO_LIMIT ? NO_LIMIT : limit - all.size();
latest = this.adjacentVertices(sourceV, latest, dir, labelId,
all, degree, remaining);
all.addAll(latest);
this.vertexIterCounter.addAndGet(1L);
this.edgeIterCounter.addAndGet(latest.size());
if (reachLimit(limit, all.size())) {
records.startOneLayer(true);
traverseIdsByBfs(records.keys(), dir, labelId, degree, NO_LIMIT, consumer);
records.finishOneLayer();
if (reachLimit(limit, records.size())) {
break;
}
}

return all;
this.vertexIterCounter.addAndGet(records.size());

return records.idsBySet(limit);
}

public KneighborRecords customizedKneighbor(Id source, Steps steps,
Expand All @@ -76,33 +78,29 @@ public KneighborRecords customizedKneighbor(Id source, Steps steps,
checkPositive(maxDepth, "k-neighbor max_depth");
checkLimit(limit);

boolean concurrent = maxDepth >= this.concurrentDepth();

KneighborRecords records = new KneighborRecords(concurrent,
KneighborRecords records = new KneighborRecords(true,
source, true);

Consumer<Id> consumer = v -> {
Consumer<Edge> consumer = edge -> {
if (this.reachLimit(limit, records.size())) {
return;
}
Iterator<Edge> edges = edgesOfVertex(v, steps);
this.vertexIterCounter.addAndGet(1L);
while (!this.reachLimit(limit, records.size()) && edges.hasNext()) {
HugeEdge edge = (HugeEdge) edges.next();
Id target = edge.id().otherVertexId();
records.addPath(v, target);

records.edgeResults().addEdge(v, target, edge);

this.edgeIterCounter.addAndGet(1L);
}
EdgeId edgeId = ((HugeEdge) edge).id();
records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge);
};

while (maxDepth-- > 0) {
records.startOneLayer(true);
traverseIds(records.keys(), consumer, concurrent);
traverseIdsByBfs(records.keys(), steps, NO_LIMIT, consumer);
records.finishOneLayer();
if (this.reachLimit(limit, records.size())) {
break;
}
}

this.vertexIterCounter.addAndGet(records.size());

return records;
}

Expand Down
Loading

0 comments on commit 8db0a9b

Please sign in to comment.