From 9a2fdb1911a29150a5f6be22be1570313f130585 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Sun, 24 Sep 2023 18:40:33 +0800 Subject: [PATCH 1/6] feat: support load vertex/edge snapshot --- .../computer/core/config/ComputerOptions.java | 56 ++++ computer-core/pom.xml | 5 + .../core/input/WorkerInputManager.java | 21 +- .../core/network/buffer/FileRegionBuffer.java | 5 + .../core/receiver/MessageRecvManager.java | 12 +- .../core/receiver/MessageRecvPartition.java | 4 + .../core/receiver/MessageRecvPartitions.java | 19 +- .../edge/EdgeMessageRecvPartitions.java | 16 +- .../message/ComputeMessageRecvPartitions.java | 13 +- .../vertex/VertexMessageRecvPartitions.java | 16 +- .../core/snapshot/SnapshotManager.java | 256 ++++++++++++++++++ .../computer/core/worker/WorkerService.java | 10 +- pom.xml | 1 + 13 files changed, 413 insertions(+), 21 deletions(-) create mode 100644 computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java index 33fba83d9..7fa42c266 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java @@ -198,6 +198,62 @@ public static synchronized ComputerOptions instance() { "" ); + public static final ConfigOption SNAPSHOT_WRITE = + new ConfigOption<>( + "snapshot.write", + "Whether write snapshot of input vertex and edge partitions", + allowValues(true, false), + false + ); + + public static final ConfigOption SNAPSHOT_LOAD = + new ConfigOption<>( + "snapshot.load", + "Whether use snapshot of input vertex and edge partitions", + allowValues(true, false), + false + ); + + public static final ConfigOption SNAPSHOT_VIEW_KEY = + new ConfigOption<>( + "snapshot.view_key", + "View key of target snapshot", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_ENDPOINT = + new ConfigOption<>( + "snapshot.minio_endpoint", + "MinIO endpoint", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_ACCESS_KEY = + new ConfigOption<>( + "snapshot.minio_access_key", + "MinIO access key", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_SECRET_KEY = + new ConfigOption<>( + "snapshot.minio_secret_key", + "MinIO secret key", + null, + "" + ); + + public static final ConfigOption SNAPSHOT_MINIO_BUCKET_NAME = + new ConfigOption<>( + "snapshot.minio_bucket_name", + "MinIO bucket name", + null, + "" + ); + public static final ConfigOption INPUT_SEND_THREAD_NUMS = new ConfigOption<>( "input.send_thread_nums", diff --git a/computer-core/pom.xml b/computer-core/pom.xml index 6344acbb2..ee53458e6 100644 --- a/computer-core/pom.xml +++ b/computer-core/pom.xml @@ -60,6 +60,11 @@ org.apache.hadoop hadoop-common + + io.minio + minio + ${minio-version} + org.apache.hadoop hadoop-hdfs-client diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java index c5e4a766c..1a3cd2c86 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java @@ -34,6 +34,7 @@ import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.worker.load.LoadService; import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.Log; @@ -60,9 +61,13 @@ public class WorkerInputManager implements Manager { */ private final MessageSendManager sendManager; + private final SnapshotManager snapshotManager; + public WorkerInputManager(ComputerContext context, - MessageSendManager sendManager) { + MessageSendManager sendManager, + SnapshotManager snapshotManager) { this.sendManager = sendManager; + this.snapshotManager = snapshotManager; this.sendThreadNum = this.inputSendThreadNum(context.config()); this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX); @@ -103,11 +108,17 @@ public void service(InputSplitRpcService rpcService) { * but there is no guarantee that all of them has been received. */ public void loadGraph() { + if (this.snapshotManager.loadSnapshot()) { + this.snapshotManager.load(); + return; + } + List> futures = new ArrayList<>(); CompletableFuture future; this.sendManager.startSend(MessageType.VERTEX); for (int i = 0; i < this.sendThreadNum; i++) { - future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex); + future = this.send(this.sendManager::sendVertex, + this.loadService::createIteratorFromVertex); futures.add(future); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> { @@ -116,11 +127,11 @@ public void loadGraph() { }).join(); this.sendManager.finishSend(MessageType.VERTEX); - futures.clear(); - this.sendManager.startSend(MessageType.EDGE); + futures.clear(); for (int i = 0; i < this.sendThreadNum; i++) { - future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge); + future = this.send(this.sendManager::sendEdge, + this.loadService::createIteratorFromEdge); futures.add(future); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> { diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java index ce1961467..8b597de42 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/buffer/FileRegionBuffer.java @@ -43,6 +43,11 @@ public FileRegionBuffer(int length) { this.length = length; } + public FileRegionBuffer(int length, String path) { + this.length = length; + this.path = path; + } + /** * Use zero-copy transform from socket channel to file * @param channel diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index b77ffa807..a2f37c259 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -38,6 +38,7 @@ import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitions; import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitions; import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.FileManager; @@ -65,6 +66,7 @@ public class MessageRecvManager implements Manager, MessageHandler { private int expectedFinishMessages; private CompletableFuture finishMessagesFuture; private AtomicInteger finishMessagesCount; + private SnapshotManager snapshotManager; private long waitFinishMessagesTimeout; private long superstep; @@ -90,9 +92,9 @@ public void init(Config config) { this.fileManager, Constants.INPUT_SUPERSTEP); this.vertexPartitions = new VertexMessageRecvPartitions( - this.context, fileGenerator, this.sortManager); + this.context, fileGenerator, this.sortManager, this.snapshotManager); this.edgePartitions = new EdgeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager); + this.context, fileGenerator, this.sortManager, this.snapshotManager); this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT); // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; @@ -108,7 +110,7 @@ public void beforeSuperstep(Config config, int superstep) { SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator( this.fileManager, superstep); this.messagePartitions = new ComputeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager); + this.context, fileGenerator, this.sortManager, this.snapshotManager); this.expectedFinishMessages = this.workerCount; this.finishMessagesFuture = new CompletableFuture<>(); this.finishMessagesCount.set(this.expectedFinishMessages); @@ -245,4 +247,8 @@ public Map messageStats() { "The messagePartitions can't be null"); return this.messagePartitions.messageStats(); } + + public void setSnapshotManager(SnapshotManager snapshotManager) { + this.snapshotManager = snapshotManager; + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java index d63d5dab3..ce0ada3e3 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartition.java @@ -127,6 +127,10 @@ public synchronized long totalBytes() { return this.totalBytes; } + public synchronized List outputFiles() { + return this.outputFiles; + } + public synchronized MessageStat messageStat() { // TODO: count the message received return new MessageStat(0L, this.totalBytes); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java index 362b1425e..222fff2c7 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvPartitions.java @@ -17,42 +17,48 @@ package org.apache.hugegraph.computer.core.receiver; -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.io.FileUtils; import org.apache.hugegraph.computer.core.common.ComputerContext; import org.apache.hugegraph.computer.core.config.Config; import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; import org.apache.hugegraph.computer.core.store.entry.KvEntry; +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public abstract class MessageRecvPartitions

{ protected final ComputerContext context; protected final Config config; protected final SuperstepFileGenerator fileGenerator; protected final SortManager sortManager; + protected final SnapshotManager snapshotManager; // The map of partition-id and the messages for the partition. private final Map partitions; public MessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { + SortManager sortManager, + SnapshotManager snapshotManager) { this.context = context; this.config = context.config(); this.fileGenerator = fileGenerator; this.sortManager = sortManager; + this.snapshotManager = snapshotManager; this.partitions = new HashMap<>(); } protected abstract P createPartition(); + protected abstract void writePartitionSnapshot(int partitionId, List outputFiles); + public void addBuffer(int partitionId, NetworkBuffer buffer) { P partition = this.partition(partitionId); partition.addBuffer(buffer); @@ -87,6 +93,7 @@ public Map> iterators() { Map> entries = new HashMap<>(); for (Map.Entry entry : this.partitions.entrySet()) { entries.put(entry.getKey(), entry.getValue().iterator()); + this.writePartitionSnapshot(entry.getKey(), entry.getValue().outputFiles()); } return entries; } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java index 705a1a204..558bfaa2b 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitions.java @@ -18,17 +18,22 @@ package org.apache.hugegraph.computer.core.receiver.edge; import org.apache.hugegraph.computer.core.common.ComputerContext; +import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; +import java.util.List; + public class EdgeMessageRecvPartitions extends MessageRecvPartitions { public EdgeMessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { - super(context, fileGenerator, sortManager); + SortManager sortManager, + SnapshotManager snapshotManager) { + super(context, fileGenerator, sortManager, snapshotManager); } @Override @@ -36,4 +41,11 @@ public EdgeMessageRecvPartition createPartition() { return new EdgeMessageRecvPartition(this.context, this.fileGenerator, this.sortManager); } + + @Override + public void writePartitionSnapshot(int partitionId, List outputFiles) { + if (this.snapshotManager.writeSnapshot()) { + this.snapshotManager.upload(MessageType.EDGE, partitionId, outputFiles); + } + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java index 618f4e439..a3e3e3584 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitions.java @@ -19,16 +19,20 @@ import org.apache.hugegraph.computer.core.common.ComputerContext; import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; +import java.util.List; + public class ComputeMessageRecvPartitions extends MessageRecvPartitions { public ComputeMessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { - super(context, fileGenerator, sortManager); + SortManager sortManager, + SnapshotManager snapshotManager) { + super(context, fileGenerator, sortManager, snapshotManager); } @Override @@ -36,4 +40,9 @@ public ComputeMessageRecvPartition createPartition() { return new ComputeMessageRecvPartition(this.context, this.fileGenerator, this.sortManager); } + + @Override + protected void writePartitionSnapshot(int partitionId, List outputFiles) { + // pass + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java index 7046f3fa7..982ae7409 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitions.java @@ -18,18 +18,23 @@ package org.apache.hugegraph.computer.core.receiver.vertex; import org.apache.hugegraph.computer.core.common.ComputerContext; +import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.receiver.MessageRecvPartitions; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator; +import java.util.List; + public class VertexMessageRecvPartitions extends MessageRecvPartitions { public VertexMessageRecvPartitions(ComputerContext context, SuperstepFileGenerator fileGenerator, - SortManager sortManager) { - super(context, fileGenerator, sortManager); + SortManager sortManager, + SnapshotManager snapshotManager) { + super(context, fileGenerator, sortManager, snapshotManager); } @Override @@ -37,4 +42,11 @@ public VertexMessageRecvPartition createPartition() { return new VertexMessageRecvPartition(this.context, this.fileGenerator, this.sortManager); } + + @Override + public void writePartitionSnapshot(int partitionId, List outputFiles) { + if (this.snapshotManager.writeSnapshot()) { + this.snapshotManager.upload(MessageType.VERTEX, partitionId, outputFiles); + } + } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java new file mode 100644 index 000000000..8956ded90 --- /dev/null +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.core.snapshot; + +import io.minio.DownloadObjectArgs; +import io.minio.ListObjectsArgs; +import io.minio.MinioClient; +import io.minio.RemoveObjectsArgs; +import io.minio.Result; +import io.minio.UploadObjectArgs; +import io.minio.messages.DeleteError; +import io.minio.messages.DeleteObject; +import io.minio.messages.Item; +import org.apache.commons.lang.StringUtils; +import org.apache.hugegraph.computer.core.common.ComputerContext; +import org.apache.hugegraph.computer.core.common.ContainerInfo; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.ComputerOptions; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.partition.Partitioner; +import org.apache.hugegraph.computer.core.manager.Manager; +import org.apache.hugegraph.computer.core.network.buffer.FileRegionBuffer; +import org.apache.hugegraph.computer.core.network.message.MessageType; +import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; +import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +import java.io.File; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; + +public class SnapshotManager implements Manager { + + private static final Logger LOG = Log.logger(SnapshotManager.class); + public static final String NAME = "worker_snapshot"; + + private final MessageSendManager sendManager; + private final MessageRecvManager recvManager; + + private final ContainerInfo workerInfo; + private final Partitioner partitioner; + private final int partitionCount; + private final boolean loadSnapshot; + private final boolean writeSnapshot; + private final String viewKey; + + private MinioClient minioClient; + private String bucketName; + + public SnapshotManager(ComputerContext context, MessageSendManager sendManager, + MessageRecvManager recvManager, ContainerInfo workerInfo) { + this.loadSnapshot = context.config().get(ComputerOptions.SNAPSHOT_LOAD); + this.writeSnapshot = context.config().get(ComputerOptions.SNAPSHOT_WRITE); + + this.sendManager = sendManager; + this.recvManager = recvManager; + this.recvManager.setSnapshotManager(this); + + this.workerInfo = workerInfo; + this.partitioner = context.config().createObject( + ComputerOptions.WORKER_PARTITIONER); + this.partitionCount = context.config().get(ComputerOptions.JOB_PARTITIONS_COUNT); + this.viewKey = context.config().get(ComputerOptions.SNAPSHOT_VIEW_KEY); + } + + @Override + public String name() { + return NAME; + } + + @Override + public void init(Config config) { + String endpoint = config.get(ComputerOptions.SNAPSHOT_MINIO_ENDPOINT); + String accessKey = config.get(ComputerOptions.SNAPSHOT_MINIO_ACCESS_KEY); + String secretKey = config.get(ComputerOptions.SNAPSHOT_MINIO_SECRET_KEY); + this.bucketName = config.get(ComputerOptions.SNAPSHOT_MINIO_BUCKET_NAME); + if (StringUtils.isNotEmpty(endpoint)) { + this.minioClient = MinioClient.builder() + .endpoint(endpoint) + .credentials(accessKey, secretKey) + .build(); + } + } + + @Override + public void close(Config config) { + // pass + } + + public boolean loadSnapshot() { + return this.loadSnapshot; + } + + public boolean writeSnapshot() { + return this.writeSnapshot; + } + + public void upload(MessageType messageType, int partitionId, List outputFiles) { + if (this.loadSnapshot()) { + LOG.info("No later {} snapshots have to be uploaded", + messageType.name().toLowerCase(Locale.ROOT)); + return; + } + this.uploadObjects(messageType, partitionId, outputFiles); + } + + public void load() { + int id = this.workerInfo.id(); + for (int partitionId = 0; partitionId < this.partitionCount; partitionId++) { + if (this.partitioner.workerId(partitionId) == id) { + // TODO: Do not need to send control message to all workers + this.sendManager.startSend(MessageType.VERTEX); + this.downloadObjects(MessageType.VERTEX, partitionId); + this.sendManager.finishSend(MessageType.VERTEX); + + this.sendManager.startSend(MessageType.EDGE); + this.downloadObjects(MessageType.EDGE, partitionId); + this.sendManager.finishSend(MessageType.EDGE); + } + } + } + + private void uploadObjects(MessageType messageType, int partitionId, + List outputFiles) { + String dirName = this.generateObjectDirName(messageType, partitionId); + + try { + this.clearObjectsIfExist(dirName); + } catch (Exception e) { + throw new ComputerException("Clear out-dated snapshots from %s failed", dirName, e); + } + + LOG.info("Upload {} snapshots for partition {}", + messageType.name().toLowerCase(Locale.ROOT), partitionId); + for (String outputFile : outputFiles) { + String objectName = dirName + new File(outputFile).getName(); + this.uploadObject(outputFile, objectName); + } + } + + private void downloadObjects(MessageType messageType, int partitionId) { + LOG.info("Load {} snapshots for partition {}", + messageType.name().toLowerCase(Locale.ROOT), partitionId); + String dirName = this.generateObjectDirName(messageType, partitionId); + + try { + Iterable> snapshotFiles = this.minioClient.listObjects( + ListObjectsArgs.builder() + .bucket(this.bucketName) + .prefix(dirName) + .build()); + + if (!snapshotFiles.iterator().hasNext()) { + throw new ComputerException("Empty snapshot directory %s", dirName); + } + + for (Result result : snapshotFiles) { + Item item = result.get(); + int size = (int) item.size(); + String objectName = item.objectName(); + + String outputPath = this.recvManager.genOutputPath(messageType, partitionId); + this.downloadObject(objectName, outputPath); + + FileRegionBuffer fileRegionBuffer = new FileRegionBuffer(size, outputPath); + this.recvManager.handle(messageType, partitionId, fileRegionBuffer); + } + } catch (Exception e) { + throw new ComputerException("Download snapshots from %s failed", dirName, e); + } + } + + private void uploadObject(String fileName, String objectName) { + try { + this.minioClient.uploadObject(UploadObjectArgs.builder() + .bucket(this.bucketName) + .object(objectName) + .filename(fileName) + .build()); + } catch (Exception e) { + throw new ComputerException("Upload snapshot %s to %s failed", + fileName, objectName, e); + } + } + + private void downloadObject(String objectName, String outputPath) { + try { + this.minioClient.downloadObject( + DownloadObjectArgs.builder() + .bucket(this.bucketName) + .object(objectName) + .filename(outputPath) + .build()); + } catch (Exception e) { + throw new ComputerException("Download snapshot from %s to %s failed", + objectName, outputPath, e); + } + } + + private void clearObjectsIfExist(String dirName) throws Exception { + List objects = new LinkedList<>(); + Iterable> snapshotFiles = this.minioClient.listObjects( + ListObjectsArgs.builder() + .bucket(this.bucketName) + .prefix(dirName) + .build()); + if (!snapshotFiles.iterator().hasNext()) { + return; + } + + LOG.info("Clear out-dated snapshots from {} first", dirName); + for (Result result : snapshotFiles) { + Item item = result.get(); + objects.add(new DeleteObject(item.objectName())); + } + Iterable> results = + minioClient.removeObjects( + RemoveObjectsArgs.builder() + .bucket(this.bucketName) + .objects(objects) + .build()); + for (Result result : results) { + DeleteError error = result.get(); + throw new ComputerException ( + "Error in deleting snapshot " + error.objectName() + "; " + error.message()); + } + } + + private String generateObjectDirName(MessageType messageType, int partitionId) { + // dir name: {VIEW_KEY}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ + return Paths.get( + this.viewKey, + this.partitioner.getClass().getSimpleName(), + String.valueOf(this.partitionCount), + messageType.name(), + String.valueOf(partitionId)).toString() + "/"; + } +} diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java index 901d9ad2e..48af87c66 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java @@ -47,6 +47,7 @@ import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.rpc.WorkerRpcManager; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SendSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -322,8 +323,15 @@ private InetSocketAddress initManagers(ContainerInfo masterInfo) { clientManager.sender()); this.managers.add(sendManager); + SnapshotManager snapshotManager = new SnapshotManager(this.context, + sendManager, + recvManager, + this.workerInfo); + this.managers.add(snapshotManager); + WorkerInputManager inputManager = new WorkerInputManager(this.context, - sendManager); + sendManager, + snapshotManager); inputManager.service(rpcManager.inputSplitService()); this.managers.add(inputManager); diff --git a/pom.xml b/pom.xml index 028af9b3a..2784670be 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 1.0.0 1.0.0 1.0.0 + 8.5.6 From 6e1e4441440b6125147e88d0b2ae63ff57b1b663 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Mon, 25 Sep 2023 09:33:51 +0800 Subject: [PATCH 2/6] fix format --- .../core/snapshot/SnapshotManager.java | 84 +++++++++---------- 1 file changed, 41 insertions(+), 43 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java index 8956ded90..b63935828 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import java.io.File; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; @@ -75,8 +76,7 @@ public SnapshotManager(ComputerContext context, MessageSendManager sendManager, this.recvManager.setSnapshotManager(this); this.workerInfo = workerInfo; - this.partitioner = context.config().createObject( - ComputerOptions.WORKER_PARTITIONER); + this.partitioner = context.config().createObject(ComputerOptions.WORKER_PARTITIONER); this.partitionCount = context.config().get(ComputerOptions.JOB_PARTITIONS_COUNT); this.viewKey = context.config().get(ComputerOptions.SNAPSHOT_VIEW_KEY); } @@ -94,9 +94,9 @@ public void init(Config config) { this.bucketName = config.get(ComputerOptions.SNAPSHOT_MINIO_BUCKET_NAME); if (StringUtils.isNotEmpty(endpoint)) { this.minioClient = MinioClient.builder() - .endpoint(endpoint) - .credentials(accessKey, secretKey) - .build(); + .endpoint(endpoint) + .credentials(accessKey, secretKey) + .build(); } } @@ -116,7 +116,7 @@ public boolean writeSnapshot() { public void upload(MessageType messageType, int partitionId, List outputFiles) { if (this.loadSnapshot()) { LOG.info("No later {} snapshots have to be uploaded", - messageType.name().toLowerCase(Locale.ROOT)); + messageType.name().toLowerCase(Locale.ROOT)); return; } this.uploadObjects(messageType, partitionId, outputFiles); @@ -145,11 +145,11 @@ private void uploadObjects(MessageType messageType, int partitionId, try { this.clearObjectsIfExist(dirName); } catch (Exception e) { - throw new ComputerException("Clear out-dated snapshots from %s failed", dirName, e); + throw new ComputerException("Failed to clear out-dated snapshots from %s", dirName, e); } LOG.info("Upload {} snapshots for partition {}", - messageType.name().toLowerCase(Locale.ROOT), partitionId); + messageType.name().toLowerCase(Locale.ROOT), partitionId); for (String outputFile : outputFiles) { String objectName = dirName + new File(outputFile).getName(); this.uploadObject(outputFile, objectName); @@ -158,15 +158,15 @@ private void uploadObjects(MessageType messageType, int partitionId, private void downloadObjects(MessageType messageType, int partitionId) { LOG.info("Load {} snapshots for partition {}", - messageType.name().toLowerCase(Locale.ROOT), partitionId); + messageType.name().toLowerCase(Locale.ROOT), partitionId); String dirName = this.generateObjectDirName(messageType, partitionId); try { Iterable> snapshotFiles = this.minioClient.listObjects( ListObjectsArgs.builder() - .bucket(this.bucketName) - .prefix(dirName) - .build()); + .bucket(this.bucketName) + .prefix(dirName) + .build()); if (!snapshotFiles.iterator().hasNext()) { throw new ComputerException("Empty snapshot directory %s", dirName); @@ -184,34 +184,33 @@ private void downloadObjects(MessageType messageType, int partitionId) { this.recvManager.handle(messageType, partitionId, fileRegionBuffer); } } catch (Exception e) { - throw new ComputerException("Download snapshots from %s failed", dirName, e); + throw new ComputerException("Failed to download snapshots from %s", dirName, e); } } private void uploadObject(String fileName, String objectName) { try { this.minioClient.uploadObject(UploadObjectArgs.builder() - .bucket(this.bucketName) - .object(objectName) - .filename(fileName) - .build()); + .bucket(this.bucketName) + .object(objectName) + .filename(fileName) + .build()); } catch (Exception e) { - throw new ComputerException("Upload snapshot %s to %s failed", - fileName, objectName, e); + throw new ComputerException("Failed to upload snapshot %s to %s", + fileName, objectName, e); } } private void downloadObject(String objectName, String outputPath) { try { - this.minioClient.downloadObject( - DownloadObjectArgs.builder() - .bucket(this.bucketName) - .object(objectName) - .filename(outputPath) - .build()); + this.minioClient.downloadObject(DownloadObjectArgs.builder() + .bucket(this.bucketName) + .object(objectName) + .filename(outputPath) + .build()); } catch (Exception e) { - throw new ComputerException("Download snapshot from %s to %s failed", - objectName, outputPath, e); + throw new ComputerException("Failed to download snapshot from %s to %s", + objectName, outputPath, e); } } @@ -219,9 +218,9 @@ private void clearObjectsIfExist(String dirName) throws Exception { List objects = new LinkedList<>(); Iterable> snapshotFiles = this.minioClient.listObjects( ListObjectsArgs.builder() - .bucket(this.bucketName) - .prefix(dirName) - .build()); + .bucket(this.bucketName) + .prefix(dirName) + .build()); if (!snapshotFiles.iterator().hasNext()) { return; } @@ -232,25 +231,24 @@ private void clearObjectsIfExist(String dirName) throws Exception { objects.add(new DeleteObject(item.objectName())); } Iterable> results = - minioClient.removeObjects( - RemoveObjectsArgs.builder() - .bucket(this.bucketName) - .objects(objects) - .build()); + minioClient.removeObjects(RemoveObjectsArgs.builder() + .bucket(this.bucketName) + .objects(objects) + .build()); for (Result result : results) { DeleteError error = result.get(); - throw new ComputerException ( - "Error in deleting snapshot " + error.objectName() + "; " + error.message()); + throw new ComputerException("Failed to delete snapshot %s, error message: %s", + error.objectName(), error.message()); } } private String generateObjectDirName(MessageType messageType, int partitionId) { // dir name: {VIEW_KEY}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ - return Paths.get( - this.viewKey, - this.partitioner.getClass().getSimpleName(), - String.valueOf(this.partitionCount), - messageType.name(), - String.valueOf(partitionId)).toString() + "/"; + Path path = Paths.get(this.viewKey, + this.partitioner.getClass().getSimpleName(), + String.valueOf(this.partitionCount), + messageType.name(), + String.valueOf(partitionId)); + return path + "/"; } } From d7c31bacd047e5e98f112922cb0d8cd2a6cff313 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Mon, 25 Sep 2023 10:08:07 +0800 Subject: [PATCH 3/6] fix UT --- .../hugegraph/computer/core/compute/ComputeManagerTest.java | 6 ++++++ .../computer/core/compute/input/EdgesInputTest.java | 6 ++++++ .../computer/core/compute/input/MessageInputTest.java | 6 ++++++ .../computer/core/network/DataServerManagerTest.java | 5 +++++ .../computer/core/receiver/MessageRecvManagerTest.java | 6 ++++++ 5 files changed, 29 insertions(+) diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java index c4052497b..a43baef84 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java @@ -47,6 +47,7 @@ import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.receiver.ReceiverUtil; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.SendSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.FileManager; @@ -104,6 +105,11 @@ public void setup() { fileManager, sortManager); this.managers.add(receiveManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); + this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java index c2126d1bd..b6c427596 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java @@ -51,6 +51,7 @@ import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.receiver.ReceiverUtil; import org.apache.hugegraph.computer.core.sender.MessageSendManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.SendSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -137,6 +138,11 @@ private void testEdgeFreq(EdgeFrequency freq) fileManager, sortManager); this.managers.add(receiveManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); + this.managers.add(snapshotManager); this.managers.initAll(this.config); ConnectionId connectionId = new ConnectionId(new InetSocketAddress( "localhost", 8081), diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java index 36361380f..3889f80f7 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java @@ -41,6 +41,7 @@ import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; import org.apache.hugegraph.computer.core.receiver.ReceiverUtil; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -90,6 +91,11 @@ public void setup() { fileManager, sortManager); this.managers.add(receiveManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); + this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java index 89029ec14..6c8673c54 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java @@ -25,6 +25,7 @@ import org.apache.hugegraph.computer.core.network.connection.ConnectionManager; import org.apache.hugegraph.computer.core.network.connection.TransportConnectionManager; import org.apache.hugegraph.computer.core.receiver.MessageRecvManager; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; import org.apache.hugegraph.computer.core.store.FileManager; @@ -55,6 +56,10 @@ public void test() { MessageRecvManager recvManager = new MessageRecvManager(context(), fileManager, sortManager); + SnapshotManager snapshotManager = new SnapshotManager(context(), + null, + recvManager, + null); recvManager.init(config); ConnectionManager connManager = new TransportConnectionManager(); DataServerManager serverManager = new DataServerManager(connManager, diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java index 3942b4eff..8887b7395 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java @@ -32,6 +32,7 @@ import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitionTest; import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitionTest; import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitionTest; +import org.apache.hugegraph.computer.core.snapshot.SnapshotManager; import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator; import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager; import org.apache.hugegraph.computer.core.sort.sorting.SortManager; @@ -49,6 +50,7 @@ public class MessageRecvManagerTest extends UnitTestBase { private FileManager fileManager; private SortManager sortManager; private MessageRecvManager receiveManager; + private SnapshotManager snapshotManager; private ConnectionId connectionId; @Before @@ -73,6 +75,10 @@ public void setup() { this.receiveManager = new MessageRecvManager(context(), this.fileManager, this.sortManager); + this.snapshotManager = new SnapshotManager(context(), + null, + receiveManager, + null); this.receiveManager.init(this.config); this.connectionId = new ConnectionId( new InetSocketAddress("localhost",8081), From 934f2a8e3e50ed239f2bd4d1d1291fcbbf33b6e0 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Wed, 11 Oct 2023 09:15:18 +0000 Subject: [PATCH 4/6] fix format & computer option --- .../computer/core/config/ComputerOptions.java | 18 +++++++++--------- .../core/receiver/MessageRecvManager.java | 18 ++++++++++++------ .../core/snapshot/SnapshotManager.java | 8 ++++---- .../core/compute/ComputeManagerTest.java | 6 +++--- .../core/compute/input/EdgesInputTest.java | 4 ++-- .../core/compute/input/MessageInputTest.java | 4 ++-- .../core/network/DataServerManagerTest.java | 4 ++-- .../core/receiver/MessageRecvManagerTest.java | 4 ++-- 8 files changed, 36 insertions(+), 30 deletions(-) diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java index 7fa42c266..f548aa135 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java @@ -201,7 +201,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_WRITE = new ConfigOption<>( "snapshot.write", - "Whether write snapshot of input vertex and edge partitions", + "Whether to write snapshot of input vertex/edge partitions.", allowValues(true, false), false ); @@ -209,15 +209,15 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_LOAD = new ConfigOption<>( "snapshot.load", - "Whether use snapshot of input vertex and edge partitions", + "Whether to load from snapshot of vertex/edge partitions.", allowValues(true, false), false ); - public static final ConfigOption SNAPSHOT_VIEW_KEY = + public static final ConfigOption SNAPSHOT_NAME = new ConfigOption<>( - "snapshot.view_key", - "View key of target snapshot", + "snapshot.name", + "The user-defined snapshot name.", null, "" ); @@ -225,7 +225,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_ENDPOINT = new ConfigOption<>( "snapshot.minio_endpoint", - "MinIO endpoint", + "The endpoint of MinIO, MinIO can be used to store snapshots.", null, "" ); @@ -233,7 +233,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_ACCESS_KEY = new ConfigOption<>( "snapshot.minio_access_key", - "MinIO access key", + "The access key of MinIO.", null, "" ); @@ -241,7 +241,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_SECRET_KEY = new ConfigOption<>( "snapshot.minio_secret_key", - "MinIO secret key", + "The secret key of MinIO.", null, "" ); @@ -249,7 +249,7 @@ public static synchronized ComputerOptions instance() { public static final ConfigOption SNAPSHOT_MINIO_BUCKET_NAME = new ConfigOption<>( "snapshot.minio_bucket_name", - "MinIO bucket name", + "The bucket name of MinIO.", null, "" ); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index a2f37c259..54237d4f3 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -91,10 +91,14 @@ public void init(Config config) { SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator( this.fileManager, Constants.INPUT_SUPERSTEP); - this.vertexPartitions = new VertexMessageRecvPartitions( - this.context, fileGenerator, this.sortManager, this.snapshotManager); - this.edgePartitions = new EdgeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager, this.snapshotManager); + this.vertexPartitions = new VertexMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); + this.edgePartitions = new EdgeMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT); // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; @@ -109,8 +113,10 @@ public void init(Config config) { public void beforeSuperstep(Config config, int superstep) { SuperstepFileGenerator fileGenerator = new SuperstepFileGenerator( this.fileManager, superstep); - this.messagePartitions = new ComputeMessageRecvPartitions( - this.context, fileGenerator, this.sortManager, this.snapshotManager); + this.messagePartitions = new ComputeMessageRecvPartitions(this.context, + fileGenerator, + this.sortManager, + this.snapshotManager); this.expectedFinishMessages = this.workerCount; this.finishMessagesFuture = new CompletableFuture<>(); this.finishMessagesCount.set(this.expectedFinishMessages); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java index b63935828..4734ee5aa 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/snapshot/SnapshotManager.java @@ -61,7 +61,7 @@ public class SnapshotManager implements Manager { private final int partitionCount; private final boolean loadSnapshot; private final boolean writeSnapshot; - private final String viewKey; + private final String snapshotName; private MinioClient minioClient; private String bucketName; @@ -78,7 +78,7 @@ public SnapshotManager(ComputerContext context, MessageSendManager sendManager, this.workerInfo = workerInfo; this.partitioner = context.config().createObject(ComputerOptions.WORKER_PARTITIONER); this.partitionCount = context.config().get(ComputerOptions.JOB_PARTITIONS_COUNT); - this.viewKey = context.config().get(ComputerOptions.SNAPSHOT_VIEW_KEY); + this.snapshotName = context.config().get(ComputerOptions.SNAPSHOT_NAME); } @Override @@ -243,8 +243,8 @@ private void clearObjectsIfExist(String dirName) throws Exception { } private String generateObjectDirName(MessageType messageType, int partitionId) { - // dir name: {VIEW_KEY}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ - Path path = Paths.get(this.viewKey, + // dir name: {SNAPSHOT_NAME}/{PARTITIONER}/{PARTITION_COUNT}/VERTEX/{PARTITION_ID}/ + Path path = Paths.get(this.snapshotName, this.partitioner.getClass().getSimpleName(), String.valueOf(this.partitionCount), messageType.name(), diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java index a43baef84..63f0c5b2d 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java @@ -106,9 +106,9 @@ public void setup() { sortManager); this.managers.add(receiveManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", @@ -231,7 +231,7 @@ private static void addMessages(Consumer consumer) message.add(id); ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(id, message), - consumer); + consumer); } } } diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java index b6c427596..f164d868b 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/EdgesInputTest.java @@ -139,9 +139,9 @@ private void testEdgeFreq(EdgeFrequency freq) sortManager); this.managers.add(receiveManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.managers.add(snapshotManager); this.managers.initAll(this.config); ConnectionId connectionId = new ConnectionId(new InetSocketAddress( diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java index 3889f80f7..e2ad3de58 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/input/MessageInputTest.java @@ -92,9 +92,9 @@ public void setup() { sortManager); this.managers.add(receiveManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.managers.add(snapshotManager); this.managers.initAll(this.config); this.connectionId = new ConnectionId(new InetSocketAddress("localhost", diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java index 6c8673c54..cf8e3e611 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/DataServerManagerTest.java @@ -57,9 +57,9 @@ public void test() { fileManager, sortManager); SnapshotManager snapshotManager = new SnapshotManager(context(), - null, + null, recvManager, - null); + null); recvManager.init(config); ConnectionManager connManager = new TransportConnectionManager(); DataServerManager serverManager = new DataServerManager(connManager, diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java index 8887b7395..b423d3861 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java @@ -76,9 +76,9 @@ public void setup() { this.fileManager, this.sortManager); this.snapshotManager = new SnapshotManager(context(), - null, + null, receiveManager, - null); + null); this.receiveManager.init(this.config); this.connectionId = new ConnectionId( new InetSocketAddress("localhost",8081), From c699ea4a339e2ca1ab2cb2b46ddb0f9c7fc0bb08 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Fri, 13 Oct 2023 06:16:28 +0000 Subject: [PATCH 5/6] fix ci --- computer-test/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/computer-test/pom.xml b/computer-test/pom.xml index 76f992b02..4b55da32f 100644 --- a/computer-test/pom.xml +++ b/computer-test/pom.xml @@ -53,6 +53,10 @@ okhttp com.squareup.okhttp + + okhttp + com.squareup.okhttp3 + From f3a2e5b9c75ee7115af85d04ab49a0168b9bb11c Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Fri, 13 Oct 2023 06:42:17 +0000 Subject: [PATCH 6/6] add license --- computer-dist/release-docs/LICENSE | 1 + .../release-docs/licenses/LICENSE-minio.txt | 202 ++++++++++++++++++ 2 files changed, 203 insertions(+) create mode 100644 computer-dist/release-docs/licenses/LICENSE-minio.txt diff --git a/computer-dist/release-docs/LICENSE b/computer-dist/release-docs/LICENSE index 251a0797f..3bd66078f 100644 --- a/computer-dist/release-docs/LICENSE +++ b/computer-dist/release-docs/LICENSE @@ -335,6 +335,7 @@ The following components are provided under the Apache 2.0 License. (Apache License, Version 2.0) * Guava: Google Core Libraries for Java(com.google.guava:guava:25.1-jre-none ) (Apache License, Version 2.0) * LZ4 and xxHash(org.lz4:lz4-java:1.4.0-none ) (Apache License, Version 2.0) * Metrics Core(io.dropwizard.metrics:metrics-core:3.2.6-none ) + (Apache License, Version 2.0) * Minio(io.minio:minio:8.5.6-https://min.io ) (Apache License, Version 2.0) * Netty/All-in-One(io.netty:netty-all:4.1.42.Final-none ) (Apache License, Version 2.0) * OkHttp(com.squareup.okhttp:okhttp:2.7.5-none ) (Apache License, Version 2.0) * hugegraph-rpc(com.baidu.hugegraph:hugegraph-rpc:2.0.1-none ) diff --git a/computer-dist/release-docs/licenses/LICENSE-minio.txt b/computer-dist/release-docs/licenses/LICENSE-minio.txt new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/computer-dist/release-docs/licenses/LICENSE-minio.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License.