Skip to content

Commit

Permalink
feat: support load vertex/edge snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Radeity committed Sep 24, 2023
1 parent 15fc9bb commit 69fb95a
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,62 @@ public static synchronized ComputerOptions instance() {
""
);

public static final ConfigOption<Boolean> SNAPSHOT_WRITE =
new ConfigOption<>(
"snapshot.write",
"Whether write snapshot of input vertex and edge partitions",
allowValues(true, false),
false
);

public static final ConfigOption<Boolean> SNAPSHOT_LOAD =
new ConfigOption<>(
"snapshot.load",
"Whether use snapshot of input vertex and edge partitions",
allowValues(true, false),
false
);

public static final ConfigOption<String> SNAPSHOT_VIEW_KEY =
new ConfigOption<>(
"snapshot.view_key",
"View key of target snapshot",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ENDPOINT =
new ConfigOption<>(
"snapshot.minio_endpoint",
"MinIO endpoint",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_ACCESS_KEY =
new ConfigOption<>(
"snapshot.minio_access_key",
"MinIO access key",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_SECRET_KEY =
new ConfigOption<>(
"snapshot.minio_secret_key",
"MinIO secret key",
null,
""
);

public static final ConfigOption<String> SNAPSHOT_MINIO_BUCKET_NAME =
new ConfigOption<>(
"snapshot.minio_bucket_name",
"MinIO bucket name",
null,
""
);

public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
new ConfigOption<>(
"input.send_thread_nums",
Expand Down
5 changes: 5 additions & 0 deletions computer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
Expand All @@ -60,9 +61,13 @@ public class WorkerInputManager implements Manager {
*/
private final MessageSendManager sendManager;

private final SnapshotManager snapshotManager;

public WorkerInputManager(ComputerContext context,
MessageSendManager sendManager) {
MessageSendManager sendManager,
SnapshotManager snapshotManager) {
this.sendManager = sendManager;
this.snapshotManager = snapshotManager;

this.sendThreadNum = this.inputSendThreadNum(context.config());
this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
Expand Down Expand Up @@ -103,11 +108,16 @@ public void service(InputSplitRpcService rpcService) {
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
if (this.snapshotManager.loadSnapshot()) {
this.snapshotManager.load();
return;
}

List<CompletableFuture<?>> futures = new ArrayList<>();
CompletableFuture<?> future;
this.sendManager.startSend(MessageType.VERTEX);
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
future = this.send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand All @@ -116,11 +126,10 @@ public void loadGraph() {
}).join();
this.sendManager.finishSend(MessageType.VERTEX);

futures.clear();

this.sendManager.startSend(MessageType.EDGE);
futures.clear();
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
future = this.send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public FileRegionBuffer(int length) {
this.length = length;
}

public FileRegionBuffer(int length, String path) {
this.length = length;
this.path = path;
}

/**
* Use zero-copy transform from socket channel to file
* @param channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitions;
import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.FileManager;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class MessageRecvManager implements Manager, MessageHandler {
private int expectedFinishMessages;
private CompletableFuture<Void> finishMessagesFuture;
private AtomicInteger finishMessagesCount;
private SnapshotManager snapshotManager;

private long waitFinishMessagesTimeout;
private long superstep;
Expand All @@ -90,9 +92,9 @@ public void init(Config config) {
this.fileManager,
Constants.INPUT_SUPERSTEP);
this.vertexPartitions = new VertexMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.context, fileGenerator, this.sortManager, this.snapshotManager);
this.edgePartitions = new EdgeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.context, fileGenerator, this.sortManager, this.snapshotManager);
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
// One for vertex and one for edge.
this.expectedFinishMessages = this.workerCount * 2;
Expand All @@ -108,7 +110,7 @@ public void beforeSuperstep(Config config, int superstep) {
SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator(
this.fileManager, superstep);
this.messagePartitions = new ComputeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.context, fileGenerator, this.sortManager, this.snapshotManager);
this.expectedFinishMessages = this.workerCount;
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);
Expand Down Expand Up @@ -245,4 +247,8 @@ public Map<Integer, MessageStat> messageStats() {
"The messagePartitions can't be null");
return this.messagePartitions.messageStats();
}

public void setSnapshotManager(SnapshotManager snapshotManager) {
this.snapshotManager = snapshotManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public synchronized long totalBytes() {
return this.totalBytes;
}

public synchronized List<String> outputFiles() {
return this.outputFiles;
}

public synchronized MessageStat messageStat() {
// TODO: count the message received
return new MessageStat(0L, this.totalBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,48 @@

package org.apache.hugegraph.computer.core.receiver;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class MessageRecvPartitions<P extends MessageRecvPartition> {

protected final ComputerContext context;
protected final Config config;
protected final SuperstepFileGenerator fileGenerator;
protected final SortManager sortManager;
protected final SnapshotManager snapshotManager;

// The map of partition-id and the messages for the partition.
private final Map<Integer, P> partitions;

public MessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
SortManager sortManager,
SnapshotManager snapshotManager) {
this.context = context;
this.config = context.config();
this.fileGenerator = fileGenerator;
this.sortManager = sortManager;
this.snapshotManager = snapshotManager;
this.partitions = new HashMap<>();
}

protected abstract P createPartition();

protected abstract void writePartitionSnapshot(int partitionId, List<String> outputFiles);

public void addBuffer(int partitionId, NetworkBuffer buffer) {
P partition = this.partition(partitionId);
partition.addBuffer(buffer);
Expand Down Expand Up @@ -87,6 +93,7 @@ public Map<Integer, PeekableIterator<KvEntry>> iterators() {
Map<Integer, PeekableIterator<KvEntry>> entries = new HashMap<>();
for (Map.Entry<Integer, P> entry : this.partitions.entrySet()) {
entries.put(entry.getKey(), entry.getValue().iterator());
this.writePartitionSnapshot(entry.getKey(), entry.getValue().outputFiles());
}
return entries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,34 @@
package org.apache.hugegraph.computer.core.receiver.edge;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class EdgeMessageRecvPartitions
extends MessageRecvPartitions<EdgeMessageRecvPartition> {

public EdgeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public EdgeMessageRecvPartition createPartition() {
return new EdgeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.EDGE, partitionId, outputFiles);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,30 @@

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class ComputeMessageRecvPartitions
extends MessageRecvPartitions<ComputeMessageRecvPartition> {

public ComputeMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public ComputeMessageRecvPartition createPartition() {
return new ComputeMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
protected void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
// pass
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,35 @@
package org.apache.hugegraph.computer.core.receiver.vertex;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;

import java.util.List;

public class VertexMessageRecvPartitions
extends MessageRecvPartitions<VertexMessageRecvPartition> {


public VertexMessageRecvPartitions(ComputerContext context,
SuperstepFileGenerator fileGenerator,
SortManager sortManager) {
super(context, fileGenerator, sortManager);
SortManager sortManager,
SnapshotManager snapshotManager) {
super(context, fileGenerator, sortManager, snapshotManager);
}

@Override
public VertexMessageRecvPartition createPartition() {
return new VertexMessageRecvPartition(this.context, this.fileGenerator,
this.sortManager);
}

@Override
public void writePartitionSnapshot(int partitionId, List<String> outputFiles) {
if (this.snapshotManager.writeSnapshot()) {
this.snapshotManager.upload(MessageType.VERTEX, partitionId, outputFiles);
}
}
}
Loading

0 comments on commit 69fb95a

Please sign in to comment.