Skip to content

Commit

Permalink
Support newly added node install and resume snapshot (apache#1439)
Browse files Browse the repository at this point in the history
Change-Id: I0bb904a967eaada26b51b9b47bbfde5d31653bd1
  • Loading branch information
Linary authored May 19, 2021
1 parent 1ede97c commit 558ab02
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package com.baidu.hugegraph.backend.store;

import java.util.Iterator;
import java.util.Set;
import java.util.Map;

import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
Expand Down Expand Up @@ -117,7 +117,7 @@ public default void setCounterLowest(HugeType type, long lowest) {
// Get current counter for a specific type
public long getCounter(HugeType type);

public default Set<String> createSnapshot(String snapshotDir) {
public default Map<String, String> createSnapshot(String snapshotDir) {
throw new UnsupportedOperationException("createSnapshot");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.zip.Checksum;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.Closure;
Expand All @@ -36,36 +40,55 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.util.CompressUtil;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.google.protobuf.ByteString;

public class StoreSnapshotFile {

private static final Logger LOG = Log.logger(StoreSnapshotFile.class);

public static final String SNAPSHOT_DIR = "snapshot";
private static final String TAR = ".tar";
private static final String SNAPSHOT_TAR = SNAPSHOT_DIR + TAR;
private static final String MANIFEST = "manifest";

private final RaftBackendStore[] stores;
private final Map<String, String> dataDisks;

public StoreSnapshotFile(RaftBackendStore[] stores) {
this.stores = stores;
this.dataDisks = new HashMap<>();
for (RaftBackendStore raftStore : stores) {
// Call RocksDBStore method reportDiskMapping()
this.dataDisks.putAll(Whitebox.invoke(raftStore, "store",
"reportDiskMapping"));
}
/*
* Like that:
* general=/parent_path/rocksdb-data
* g/VERTEX=/parent_path/rocksdb-vertex
*/
LOG.debug("The store data disks mapping {}", this.dataDisks);
}

public void save(SnapshotWriter writer, Closure done,
ExecutorService executor) {
try {
// Write snapshot to real directory
Set<String> snapshotDirs = this.doSnapshotSave();
Map<String, String> snapshotDirMaps = this.doSnapshotSave();
executor.execute(() -> {
String jraftSnapshotPath = this.writeManifest(writer,
snapshotDirs,
done);
this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done);
try {
this.compressSnapshotDir(writer, snapshotDirMaps);
this.deleteSnapshotDirs(snapshotDirMaps.keySet());
done.run(Status.OK());
} catch (Throwable e) {
LOG.error("Failed to compress snapshot", e);
done.run(new Status(RaftError.EIO,
"Failed to compress snapshot, " +
"error is %s", e.getMessage()));
}
});
} catch (Throwable e) {
LOG.error("Failed to save snapshot", e);
Expand All @@ -76,43 +99,38 @@ public void save(SnapshotWriter writer, Closure done,
}

public boolean load(SnapshotReader reader) {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_TAR);
String readerPath = reader.getPath();
if (meta == null) {
LOG.error("Can't find snapshot archive file, path={}", readerPath);
return false;
Set<String> snapshotDirTars = reader.listFiles();
LOG.info("The snapshot tar files to be loaded are {}", snapshotDirTars);
Set<String> snapshotDirs = new HashSet<>();
for (String snapshotDirTar : snapshotDirTars) {
try {
String snapshotDir = this.decompressSnapshot(reader,
snapshotDirTar);
snapshotDirs.add(snapshotDir);
} catch (Throwable e) {
LOG.error("Failed to decompress snapshot tar", e);
return false;
}
}
String jraftSnapshotPath = Paths.get(readerPath, SNAPSHOT_DIR)
.toString();

try {
// Decompress manifest and data directory
this.decompressSnapshot(readerPath, meta);
this.doSnapshotLoad();
File tmp = new File(jraftSnapshotPath);
// Delete the decompressed temporary file. If the deletion fails
// (although it is a small probability event), it may affect the
// next snapshot decompression result. Therefore, the safest way
// is to terminate the state machine immediately. Users can choose
// to manually delete and restart according to the log information.
if (tmp.exists()) {
FileUtils.forceDelete(tmp);
}
return true;
this.deleteSnapshotDirs(snapshotDirs);
} catch (Throwable e) {
LOG.error("Failed to load snapshot", e);
return false;
}
return true;
}

private Set<String> doSnapshotSave() {
Set<String> snapshotDirs = InsertionOrderUtil.newSet();
private Map<String, String> doSnapshotSave() {
Map<String, String> snapshotDirMaps = InsertionOrderUtil.newMap();
for (RaftBackendStore store : this.stores) {
Set<String> snapshots = store.originStore()
.createSnapshot(SNAPSHOT_DIR);
snapshotDirs.addAll(snapshots);
snapshotDirMaps.putAll(store.originStore()
.createSnapshot(SNAPSHOT_DIR));
}
LOG.info("Saved all snapshots: {}", snapshotDirs);
return snapshotDirs;
LOG.info("Saved all snapshots: {}", snapshotDirMaps);
return snapshotDirMaps;
}

private void doSnapshotLoad() {
Expand All @@ -121,61 +139,76 @@ private void doSnapshotLoad() {
}
}

private String writeManifest(SnapshotWriter writer,
Set<String> snapshotFiles,
Closure done) {
private void compressSnapshotDir(SnapshotWriter writer,
Map<String, String> snapshotDirMaps) {
String writerPath = writer.getPath();
// Write all backend compressed snapshot file path to manifest
String jraftSnapshotPath = Paths.get(writerPath, SNAPSHOT_DIR)
.toString();
File snapshotManifestFile = new File(jraftSnapshotPath, MANIFEST);
try {
FileUtils.writeLines(snapshotManifestFile, snapshotFiles);
} catch (IOException e) {
done.run(new Status(RaftError.EIO,
"Failed to write backend snapshot file path " +
"to manifest"));
}
return jraftSnapshotPath;
}
for (Map.Entry<String, String> entry : snapshotDirMaps.entrySet()) {
String snapshotDir = entry.getKey();
String diskTableKey = entry.getValue();
String snapshotDirTar = Paths.get(snapshotDir).getFileName()
.toString() + TAR;
String outputFile = Paths.get(writerPath, snapshotDirTar)
.toString();
Checksum checksum = new CRC64();
try {
CompressUtil.compressTar(snapshotDir, outputFile, checksum);
} catch (Throwable e) {
throw new RaftException(
"Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
}

private void compressJraftSnapshotDir(SnapshotWriter writer,
String jraftSnapshotPath,
Closure done) {
String writerPath = writer.getPath();
String outputFile = Paths.get(writerPath, SNAPSHOT_TAR).toString();
try {
LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
Checksum checksum = new CRC64();
CompressUtil.compressTar(jraftSnapshotPath, outputFile, checksum);
metaBuilder.setChecksum(Long.toHexString(checksum.getValue()));
if (writer.addFile(SNAPSHOT_TAR, metaBuilder.build())) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO,
"Failed to add snapshot file: '%s'",
writerPath));
/*
* snapshot_rocksdb-data.tar -> general
* snapshot_rocksdb-vertex.tar -> g/VERTEX
*/
metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey));
if (!writer.addFile(snapshotDirTar, metaBuilder.build())) {
throw new RaftException("Failed to add snapshot file: '%s'",
snapshotDirTar);
}
} catch (Throwable e) {
LOG.error("Failed to compress snapshot, path={}, files={}, {}.",
writerPath, writer.listFiles(), e);
done.run(new Status(RaftError.EIO,
"Failed to compress snapshot '%s' due to: %s",
writerPath, e.getMessage()));
}
}

private void decompressSnapshot(String readerPath, LocalFileMeta meta)
throws IOException {
String archiveFile = Paths.get(readerPath, SNAPSHOT_TAR).toString();
private String decompressSnapshot(SnapshotReader reader,
String snapshotDirTar)
throws IOException {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
if (meta == null) {
throw new IOException("Can't find snapshot archive file, path=" +
snapshotDirTar);
}

String diskTableKey = meta.getUserMeta().toStringUtf8();
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).getParent().toString();
String snapshotDir = Paths.get(parentPath,
StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
LOG.info("Delete stale snapshot dir {}", snapshotDir);

Checksum checksum = new CRC64();
CompressUtil.decompressTar(archiveFile, readerPath, checksum);
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar)
.toString();
CompressUtil.decompressTar(archiveFile, parentPath, checksum);
if (meta.hasChecksum()) {
String expected = meta.getChecksum();
String actual = Long.toHexString(checksum.getValue());
E.checkArgument(expected.equals(actual),
"Snapshot checksum error: '%s' != '%s'",
actual, expected);
}
return snapshotDir;
}

private void deleteSnapshotDirs(Set<String> snapshotDirs) {
for (String snapshotDir : snapshotDirs) {
FileUtils.deleteQuietly(new File(snapshotDir));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.LZ4Util;
import com.baidu.hugegraph.util.Log;

Expand Down Expand Up @@ -162,7 +163,9 @@ public void onApply(Iterator iter) {

private void applyCommand(StoreType type, StoreAction action,
BytesBuffer buffer, boolean forwarded) {
BackendStore store = type != StoreType.ALL ? this.store(type) : null;
E.checkState(type != StoreType.ALL,
"Can't apply command for all store at one time");
BackendStore store = this.store(type);
switch (action) {
case CLEAR:
boolean clearSpace = buffer.read() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ public String hardLinkSnapshot(String snapshotPath) throws RocksDBException {
snapshotPath, null).rocksdb) {
RocksDBStdSessions.createCheckpoint(rocksdb, snapshotLinkPath);
}
LOG.debug("The snapshot {} has been hard linked to {}",
snapshotPath, snapshotLinkPath);
LOG.info("The snapshot {} has been hard linked to {}",
snapshotPath, snapshotLinkPath);
return snapshotLinkPath;
}

Expand Down
Loading

0 comments on commit 558ab02

Please sign in to comment.