Skip to content

Commit

Permalink
fix: rocksdb
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed May 8, 2023
1 parent b13bf6d commit 2e76d4b
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ private void waitUntilAllTasksCompleted() {
private class StandardHugeGraphParams implements HugeGraphParams {

private HugeGraph graph = StandardHugeGraph.this;
private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this.graph);
private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this);

private void graph(HugeGraph graph) {
this.graph = graph;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void compressSnapshotDir(SnapshotWriter writer, Map<String, String> snap
try {
LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
long begin = System.currentTimeMillis();
String rootDir = Paths.get(snapshotDir).getParent().toString();
String rootDir = Paths.get(snapshotDir).toAbsolutePath().getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir, outputFile, checksum);
Expand Down Expand Up @@ -200,7 +200,7 @@ private String decompressSnapshot(SnapshotReader reader,
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).getParent().toString();
String parentPath = Paths.get(dataPath).toAbsolutePath().getParent().toString();
String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hugegraph.backend.store.raft.rpc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.hugegraph.backend.store.raft.RaftStoreClosure;
Expand Down Expand Up @@ -77,7 +78,11 @@ public void forwardToLeader(PeerId leaderId, StoreCommand command,
public void setResponse(StoreCommandResponse response) {
if (response.getStatus()) {
LOG.debug("StoreCommandResponse status ok");
future.complete(Status.OK(), () -> null);
CompletableFuture<Void> supplyFuture = new CompletableFuture<>();
supplyFuture.complete(null);
future.complete(Status.OK(), () -> {
return supplyFuture;
});
} else {
LOG.debug("StoreCommandResponse status error");
Status status = new Status(RaftError.UNKNOWN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.job.EphemeralJobBuilder;
import org.apache.hugegraph.util.Log;
Expand All @@ -41,14 +41,14 @@ public class EphemeralJobQueue {

private final AtomicReference<State> state;

private final HugeGraph graph;
private final HugeGraphParams graph;

private enum State {
INIT,
EXECUTE,
}

public EphemeralJobQueue(HugeGraph graph) {
public EphemeralJobQueue(HugeGraphParams graph) {
this.state = new AtomicReference<>(State.INIT);
this.graph = graph;
this.pendingQueue = new ArrayBlockingQueue<>(CAPACITY);
Expand All @@ -69,6 +69,10 @@ public void add(EphemeralJob<?> job) {
this.reScheduleIfNeeded();
}

protected HugeGraphParams params() {
return this.graph;
}

protected Queue<EphemeralJob<?>> queue() {
return this.pendingQueue;
}
Expand All @@ -81,7 +85,7 @@ public void reScheduleIfNeeded() {
if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
try {
BatchEphemeralJob job = new BatchEphemeralJob(this);
EphemeralJobBuilder.of(this.graph)
EphemeralJobBuilder.of(this.graph.graph())
.name("batch-ephemeral-job")
.job(job)
.schedule();
Expand Down Expand Up @@ -181,6 +185,7 @@ private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prev) throws E
GraphIndexTransaction systemTx = this.params().graphTransaction().indexTransaction();
Object ret = prev;
for (EphemeralJob<?> job : jobs) {
initJob(job);
Object obj = job.call();
if (job instanceof Reduce) {
ret = ((Reduce) job).reduce(ret, obj);
Expand All @@ -193,6 +198,11 @@ private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prev) throws E
return ret;
}

private void initJob(EphemeralJob<?> job) {
job.graph(this.graph());
job.params(this.params());
}

@Override
public Object call() throws Exception {
try {
Expand All @@ -204,12 +214,16 @@ public Object call() throws Exception {
Thread.currentThread().interrupt();
if (queue != null) {
queue.queue().clear();
queue.consumeComplete();
}
throw e;
}

if (queue != null && !queue.isEmpty()) {
queue.reScheduleIfNeeded();
if (queue != null) {
queue.consumeComplete();
if (!queue.isEmpty()) {
queue.reScheduleIfNeeded();
}
}
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected void save() {
}
}

protected void graph(HugeGraph graph) {
public void graph(HugeGraph graph) {
this.graph = graph;
}

Expand Down Expand Up @@ -176,7 +176,7 @@ public abstract static class SysTaskCallable<V> extends TaskCallable<V> {

private HugeGraphParams params = null;

protected void params(HugeGraphParams params) {
public void params(HugeGraphParams params) {
this.params = params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public static void decompressTar(String sourceFile, String outputDir,
Files.createDirectories(newPath);
} else {
// check parent folder again
Path parent = newPath.getParent();
Path parent = newPath.toAbsolutePath().getParent();
if (parent != null) {
if (Files.notExists(parent)) {
Files.createDirectories(parent);
Expand Down Expand Up @@ -176,7 +176,7 @@ private static Path zipSlipProtect(ArchiveEntry entry, Path targetDir)

public static void compressZip(String inputDir, String outputFile,
Checksum checksum) throws IOException {
String rootDir = Paths.get(inputDir).getParent().toString();
String rootDir = Paths.get(inputDir).toAbsolutePath().getParent().toString();
String sourceDir = Paths.get(inputDir).getFileName().toString();
compressZip(rootDir, sourceDir, outputFile, checksum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public long totalSize() {
}

public void createCheckpoint(String targetPath) {
Path parentName = Paths.get(targetPath).getParent().getFileName();
Path parentName = Paths.get(targetPath).toAbsolutePath().getParent().getFileName();
assert parentName.toString().startsWith("snapshot") : targetPath;
// https://github.com/facebook/rocksdb/wiki/Checkpoints
try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ public void resumeSnapshot(String snapshotPath) {
public String buildSnapshotPath(String snapshotPrefix) {
// Like: parent_path/rocksdb-data/*, * can be g,m,s
Path originDataPath = Paths.get(this.dataPath);
Path parentParentPath = originDataPath.getParent().getParent();
Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
Path pureDataPath = parentParentPath.relativize(originDataPath);
Path pureDataPath = parentParentPath.relativize(originDataPath.toAbsolutePath());
// Like: parent_path/snapshot_rocksdb-data/*
Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" +
pureDataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ public Map<String, String> createSnapshot(String snapshotPrefix) {
for (Map.Entry<String, RocksDBSessions> entry : this.dbs.entrySet()) {
// Like: parent_path/rocksdb-data/*, * maybe g,m,s
Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath();
Path parentParentPath = originDataPath.getParent().getParent();
Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
Path pureDataPath = parentParentPath.relativize(originDataPath);
// Like: parent_path/snapshot_rocksdb-data/*
Expand All @@ -740,7 +740,7 @@ public Map<String, String> createSnapshot(String snapshotPrefix) {
RocksDBSessions sessions = entry.getValue();
sessions.createSnapshot(snapshotPath.toString());

String snapshotDir = snapshotPath.getParent().toString();
String snapshotDir = snapshotPath.toAbsolutePath().getParent().toString();
// Find correspond data HugeType key
String diskTableKey = this.findDiskTableKeyByPath(
entry.getKey());
Expand Down Expand Up @@ -781,7 +781,7 @@ public void resumeSnapshot(String snapshotPrefix, boolean deleteSnapshot) {

if (deleteSnapshot) {
// Delete empty snapshot parent directory
Path parentPath = Paths.get(snapshotPath).getParent();
Path parentPath = Paths.get(snapshotPath).toAbsolutePath().getParent();
if (Files.list(parentPath).count() == 0) {
FileUtils.deleteDirectory(parentPath.toFile());
}
Expand Down Expand Up @@ -866,7 +866,7 @@ private Map<String, String> reportDiskMapping() {
diskMapping.put(TABLE_GENERAL_KEY, this.dataPath);
for (Map.Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String key = this.store + "/" + e.getKey().name();
String value = Paths.get(e.getValue()).getParent().toString();
String value = Paths.get(e.getValue()).toAbsolutePath().getParent().toString();
diskMapping.put(key, value);
}
return diskMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private void createTable(String table) throws RocksDBException {
Path sstFile = Paths.get(this.dataPath, table,
number + RocksDBIngester.SST);
try {
FileUtils.forceMkdir(sstFile.getParent().toFile());
FileUtils.forceMkdir(sstFile.toAbsolutePath().getParent().toFile());
} catch (IOException e) {
throw new BackendException("Can't make directory for sst: '%s'",
e, sstFile.toString());
Expand Down

0 comments on commit 2e76d4b

Please sign in to comment.