From 2e76d4beb1ba84066cc2c52dd779463998f72467 Mon Sep 17 00:00:00 2001 From: vaughn Date: Mon, 8 May 2023 11:52:54 +0800 Subject: [PATCH] fix: rocksdb --- .../apache/hugegraph/StandardHugeGraph.java | 2 +- .../backend/store/raft/StoreSnapshotFile.java | 4 +-- .../backend/store/raft/rpc/RpcForwarder.java | 7 ++++- .../backend/tx/EphemeralJobQueue.java | 26 ++++++++++++++----- .../apache/hugegraph/task/TaskCallable.java | 4 +-- .../apache/hugegraph/util/CompressUtil.java | 4 +-- .../backend/store/rocksdb/OpenedRocksDB.java | 2 +- .../store/rocksdb/RocksDBStdSessions.java | 4 +-- .../backend/store/rocksdb/RocksDBStore.java | 8 +++--- .../store/rocksdbsst/RocksDBSstSessions.java | 2 +- 10 files changed, 41 insertions(+), 22 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index c50e51264b..c896618601 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -1165,7 +1165,7 @@ private void waitUntilAllTasksCompleted() { private class StandardHugeGraphParams implements HugeGraphParams { private HugeGraph graph = StandardHugeGraph.this; - private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this.graph); + private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this); private void graph(HugeGraph graph) { this.graph = graph; diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java index 6cf08f1e58..7c8d604583 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -164,7 +164,7 @@ private void compressSnapshotDir(SnapshotWriter writer, Map snap try { LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile); long begin = System.currentTimeMillis(); - String rootDir = Paths.get(snapshotDir).getParent().toString(); + String rootDir = Paths.get(snapshotDir).toAbsolutePath().getParent().toString(); String sourceDir = Paths.get(snapshotDir).getFileName().toString(); CompressStrategyManager.getDefault() .compressZip(rootDir, sourceDir, outputFile, checksum); @@ -200,7 +200,7 @@ private String decompressSnapshot(SnapshotReader reader, E.checkArgument(this.dataDisks.containsKey(diskTableKey), "The data path for '%s' should be exist", diskTableKey); String dataPath = this.dataDisks.get(diskTableKey); - String parentPath = Paths.get(dataPath).getParent().toString(); + String parentPath = Paths.get(dataPath).toAbsolutePath().getParent().toString(); String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR)) .toString(); FileUtils.deleteDirectory(new File(snapshotDir)); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java index 389a2ecf44..7fe557ea0b 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java @@ -17,6 +17,7 @@ package org.apache.hugegraph.backend.store.raft.rpc; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.hugegraph.backend.store.raft.RaftStoreClosure; @@ -77,7 +78,11 @@ public void forwardToLeader(PeerId leaderId, StoreCommand command, public void setResponse(StoreCommandResponse response) { if (response.getStatus()) { LOG.debug("StoreCommandResponse status ok"); - future.complete(Status.OK(), () -> null); + CompletableFuture supplyFuture = new CompletableFuture<>(); + supplyFuture.complete(null); + future.complete(Status.OK(), () -> { + return supplyFuture; + }); } else { LOG.debug("StoreCommandResponse status error"); Status status = new Status(RaftError.UNKNOWN, diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java index 4484ec8eea..0b4254bc34 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java @@ -25,7 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; -import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.HugeGraphParams; import org.apache.hugegraph.job.EphemeralJob; import org.apache.hugegraph.job.EphemeralJobBuilder; import org.apache.hugegraph.util.Log; @@ -41,14 +41,14 @@ public class EphemeralJobQueue { private final AtomicReference state; - private final HugeGraph graph; + private final HugeGraphParams graph; private enum State { INIT, EXECUTE, } - public EphemeralJobQueue(HugeGraph graph) { + public EphemeralJobQueue(HugeGraphParams graph) { this.state = new AtomicReference<>(State.INIT); this.graph = graph; this.pendingQueue = new ArrayBlockingQueue<>(CAPACITY); @@ -69,6 +69,10 @@ public void add(EphemeralJob job) { this.reScheduleIfNeeded(); } + protected HugeGraphParams params() { + return this.graph; + } + protected Queue> queue() { return this.pendingQueue; } @@ -81,7 +85,7 @@ public void reScheduleIfNeeded() { if (this.state.compareAndSet(State.INIT, State.EXECUTE)) { try { BatchEphemeralJob job = new BatchEphemeralJob(this); - EphemeralJobBuilder.of(this.graph) + EphemeralJobBuilder.of(this.graph.graph()) .name("batch-ephemeral-job") .job(job) .schedule(); @@ -181,6 +185,7 @@ private Object executeBatchJob(List> jobs, Object prev) throws E GraphIndexTransaction systemTx = this.params().graphTransaction().indexTransaction(); Object ret = prev; for (EphemeralJob job : jobs) { + initJob(job); Object obj = job.call(); if (job instanceof Reduce) { ret = ((Reduce) job).reduce(ret, obj); @@ -193,6 +198,11 @@ private Object executeBatchJob(List> jobs, Object prev) throws E return ret; } + private void initJob(EphemeralJob job) { + job.graph(this.graph()); + job.params(this.params()); + } + @Override public Object call() throws Exception { try { @@ -204,12 +214,16 @@ public Object call() throws Exception { Thread.currentThread().interrupt(); if (queue != null) { queue.queue().clear(); + queue.consumeComplete(); } throw e; } - if (queue != null && !queue.isEmpty()) { - queue.reScheduleIfNeeded(); + if (queue != null) { + queue.consumeComplete(); + if (!queue.isEmpty()) { + queue.reScheduleIfNeeded(); + } } throw e; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java index 117613c731..d73a6d71e3 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java @@ -124,7 +124,7 @@ protected void save() { } } - protected void graph(HugeGraph graph) { + public void graph(HugeGraph graph) { this.graph = graph; } @@ -176,7 +176,7 @@ public abstract static class SysTaskCallable extends TaskCallable { private HugeGraphParams params = null; - protected void params(HugeGraphParams params) { + public void params(HugeGraphParams params) { this.params = params; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java index 2a01ba6a04..0f5c179f47 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java @@ -145,7 +145,7 @@ public static void decompressTar(String sourceFile, String outputDir, Files.createDirectories(newPath); } else { // check parent folder again - Path parent = newPath.getParent(); + Path parent = newPath.toAbsolutePath().getParent(); if (parent != null) { if (Files.notExists(parent)) { Files.createDirectories(parent); @@ -176,7 +176,7 @@ private static Path zipSlipProtect(ArchiveEntry entry, Path targetDir) public static void compressZip(String inputDir, String outputFile, Checksum checksum) throws IOException { - String rootDir = Paths.get(inputDir).getParent().toString(); + String rootDir = Paths.get(inputDir).toAbsolutePath().getParent().toString(); String sourceDir = Paths.get(inputDir).getFileName().toString(); compressZip(rootDir, sourceDir, outputFile, checksum); } diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java index 3ae6ba3fea..91e02878aa 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java @@ -99,7 +99,7 @@ public long totalSize() { } public void createCheckpoint(String targetPath) { - Path parentName = Paths.get(targetPath).getParent().getFileName(); + Path parentName = Paths.get(targetPath).toAbsolutePath().getParent().getFileName(); assert parentName.toString().startsWith("snapshot") : targetPath; // https://github.com/facebook/rocksdb/wiki/Checkpoints try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) { diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 71a66906dd..bcbe37b7c3 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -295,9 +295,9 @@ public void resumeSnapshot(String snapshotPath) { public String buildSnapshotPath(String snapshotPrefix) { // Like: parent_path/rocksdb-data/*, * can be g,m,s Path originDataPath = Paths.get(this.dataPath); - Path parentParentPath = originDataPath.getParent().getParent(); + Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent(); // Like: rocksdb-data/* - Path pureDataPath = parentParentPath.relativize(originDataPath); + Path pureDataPath = parentParentPath.relativize(originDataPath.toAbsolutePath()); // Like: parent_path/snapshot_rocksdb-data/* Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" + pureDataPath); diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index 4158c7d832..2dba5fa766 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -729,7 +729,7 @@ public Map createSnapshot(String snapshotPrefix) { for (Map.Entry entry : this.dbs.entrySet()) { // Like: parent_path/rocksdb-data/*, * maybe g,m,s Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath(); - Path parentParentPath = originDataPath.getParent().getParent(); + Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent(); // Like: rocksdb-data/* Path pureDataPath = parentParentPath.relativize(originDataPath); // Like: parent_path/snapshot_rocksdb-data/* @@ -740,7 +740,7 @@ public Map createSnapshot(String snapshotPrefix) { RocksDBSessions sessions = entry.getValue(); sessions.createSnapshot(snapshotPath.toString()); - String snapshotDir = snapshotPath.getParent().toString(); + String snapshotDir = snapshotPath.toAbsolutePath().getParent().toString(); // Find correspond data HugeType key String diskTableKey = this.findDiskTableKeyByPath( entry.getKey()); @@ -781,7 +781,7 @@ public void resumeSnapshot(String snapshotPrefix, boolean deleteSnapshot) { if (deleteSnapshot) { // Delete empty snapshot parent directory - Path parentPath = Paths.get(snapshotPath).getParent(); + Path parentPath = Paths.get(snapshotPath).toAbsolutePath().getParent(); if (Files.list(parentPath).count() == 0) { FileUtils.deleteDirectory(parentPath.toFile()); } @@ -866,7 +866,7 @@ private Map reportDiskMapping() { diskMapping.put(TABLE_GENERAL_KEY, this.dataPath); for (Map.Entry e : this.tableDiskMapping.entrySet()) { String key = this.store + "/" + e.getKey().name(); - String value = Paths.get(e.getValue()).getParent().toString(); + String value = Paths.get(e.getValue()).toAbsolutePath().getParent().toString(); diskMapping.put(key, value); } return diskMapping; diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index 393cb2ef13..3d2b7f867a 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -108,7 +108,7 @@ private void createTable(String table) throws RocksDBException { Path sstFile = Paths.get(this.dataPath, table, number + RocksDBIngester.SST); try { - FileUtils.forceMkdir(sstFile.getParent().toFile()); + FileUtils.forceMkdir(sstFile.toAbsolutePath().getParent().toFile()); } catch (IOException e) { throw new BackendException("Can't make directory for sst: '%s'", e, sstFile.toString());