Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support newly added node install and resume snapshot #1439

Merged
merged 3 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package com.baidu.hugegraph.backend.store;

import java.util.Iterator;
import java.util.Set;
import java.util.Map;

import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
Expand Down Expand Up @@ -117,7 +117,7 @@ public default void setCounterLowest(HugeType type, long lowest) {
// Get current counter for a specific type
public long getCounter(HugeType type);

public default Set<String> createSnapshot(String snapshotDir) {
public default Map<String, String> createSnapshot(String snapshotDir) {
throw new UnsupportedOperationException("createSnapshot");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.zip.Checksum;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.Closure;
Expand All @@ -36,36 +40,55 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.util.CompressUtil;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.google.protobuf.ByteString;

public class StoreSnapshotFile {

private static final Logger LOG = Log.logger(StoreSnapshotFile.class);

public static final String SNAPSHOT_DIR = "snapshot";
private static final String TAR = ".tar";
private static final String SNAPSHOT_TAR = SNAPSHOT_DIR + TAR;
private static final String MANIFEST = "manifest";

private final RaftBackendStore[] stores;
private final Map<String, String> dataDisks;

public StoreSnapshotFile(RaftBackendStore[] stores) {
this.stores = stores;
this.dataDisks = new HashMap<>();
for (RaftBackendStore raftStore : stores) {
// Call RocksDBStore method reportDiskMapping()
this.dataDisks.putAll(Whitebox.invoke(raftStore, "store",
"reportDiskMapping"));
}
/*
* Like that:
* general=/parent_path/rocksdb-data
* g/VERTEX=/parent_path/rocksdb-vertex
*/
LOG.debug("The store data disks mapping {}", this.dataDisks);
}

public void save(SnapshotWriter writer, Closure done,
ExecutorService executor) {
try {
// Write snapshot to real directory
Set<String> snapshotDirs = this.doSnapshotSave();
Map<String, String> snapshotDirMaps = this.doSnapshotSave();
executor.execute(() -> {
String jraftSnapshotPath = this.writeManifest(writer,
snapshotDirs,
done);
this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done);
try {
this.compressSnapshotDir(writer, snapshotDirMaps);
this.deleteSnapshotDirs(snapshotDirMaps.keySet());
done.run(Status.OK());
} catch (Throwable e) {
LOG.error("Failed to compress snapshot", e);
done.run(new Status(RaftError.EIO,
"Failed to compress snapshot, " +
"error is %s", e.getMessage()));
}
});
} catch (Throwable e) {
LOG.error("Failed to save snapshot", e);
Expand All @@ -76,43 +99,38 @@ public void save(SnapshotWriter writer, Closure done,
}

public boolean load(SnapshotReader reader) {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_TAR);
String readerPath = reader.getPath();
if (meta == null) {
LOG.error("Can't find snapshot archive file, path={}", readerPath);
return false;
Set<String> snapshotDirTars = reader.listFiles();
LOG.info("The snapshot tar files to be loaded are {}", snapshotDirTars);
Set<String> snapshotDirs = new HashSet<>();
for (String snapshotDirTar : snapshotDirTars) {
try {
String snapshotDir = this.decompressSnapshot(reader,
snapshotDirTar);
snapshotDirs.add(snapshotDir);
} catch (Throwable e) {
LOG.error("Failed to decompress snapshot tar", e);
return false;
}
}
String jraftSnapshotPath = Paths.get(readerPath, SNAPSHOT_DIR)
.toString();

try {
// Decompress manifest and data directory
this.decompressSnapshot(readerPath, meta);
this.doSnapshotLoad();
File tmp = new File(jraftSnapshotPath);
// Delete the decompressed temporary file. If the deletion fails
// (although it is a small probability event), it may affect the
// next snapshot decompression result. Therefore, the safest way
// is to terminate the state machine immediately. Users can choose
// to manually delete and restart according to the log information.
if (tmp.exists()) {
FileUtils.forceDelete(tmp);
}
return true;
this.deleteSnapshotDirs(snapshotDirs);
} catch (Throwable e) {
LOG.error("Failed to load snapshot", e);
return false;
}
return true;
}

private Set<String> doSnapshotSave() {
Set<String> snapshotDirs = InsertionOrderUtil.newSet();
private Map<String, String> doSnapshotSave() {
Map<String, String> snapshotDirMaps = InsertionOrderUtil.newMap();
for (RaftBackendStore store : this.stores) {
Set<String> snapshots = store.originStore()
.createSnapshot(SNAPSHOT_DIR);
snapshotDirs.addAll(snapshots);
snapshotDirMaps.putAll(store.originStore()
.createSnapshot(SNAPSHOT_DIR));
}
LOG.info("Saved all snapshots: {}", snapshotDirs);
return snapshotDirs;
LOG.info("Saved all snapshots: {}", snapshotDirMaps);
return snapshotDirMaps;
}

private void doSnapshotLoad() {
Expand All @@ -121,61 +139,76 @@ private void doSnapshotLoad() {
}
}

private String writeManifest(SnapshotWriter writer,
Set<String> snapshotFiles,
Closure done) {
private void compressSnapshotDir(SnapshotWriter writer,
Map<String, String> snapshotDirMaps) {
String writerPath = writer.getPath();
// Write all backend compressed snapshot file path to manifest
String jraftSnapshotPath = Paths.get(writerPath, SNAPSHOT_DIR)
.toString();
File snapshotManifestFile = new File(jraftSnapshotPath, MANIFEST);
try {
FileUtils.writeLines(snapshotManifestFile, snapshotFiles);
} catch (IOException e) {
done.run(new Status(RaftError.EIO,
"Failed to write backend snapshot file path " +
"to manifest"));
}
return jraftSnapshotPath;
}
for (Map.Entry<String, String> entry : snapshotDirMaps.entrySet()) {
String snapshotDir = entry.getKey();
String diskTableKey = entry.getValue();
String snapshotDirTar = Paths.get(snapshotDir).getFileName()
.toString() + TAR;
String outputFile = Paths.get(writerPath, snapshotDirTar)
.toString();
Checksum checksum = new CRC64();
try {
CompressUtil.compressTar(snapshotDir, outputFile, checksum);
} catch (Throwable e) {
throw new RaftException(
"Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
}

private void compressJraftSnapshotDir(SnapshotWriter writer,
String jraftSnapshotPath,
Closure done) {
String writerPath = writer.getPath();
String outputFile = Paths.get(writerPath, SNAPSHOT_TAR).toString();
try {
LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
Checksum checksum = new CRC64();
CompressUtil.compressTar(jraftSnapshotPath, outputFile, checksum);
metaBuilder.setChecksum(Long.toHexString(checksum.getValue()));
if (writer.addFile(SNAPSHOT_TAR, metaBuilder.build())) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO,
"Failed to add snapshot file: '%s'",
writerPath));
/*
* snapshot_rocksdb-data.tar -> general
* snapshot_rocksdb-vertex.tar -> g/VERTEX
*/
metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey));
if (!writer.addFile(snapshotDirTar, metaBuilder.build())) {
throw new RaftException("Failed to add snapshot file: '%s'",
snapshotDirTar);
}
} catch (Throwable e) {
LOG.error("Failed to compress snapshot, path={}, files={}, {}.",
writerPath, writer.listFiles(), e);
done.run(new Status(RaftError.EIO,
"Failed to compress snapshot '%s' due to: %s",
writerPath, e.getMessage()));
}
}

private void decompressSnapshot(String readerPath, LocalFileMeta meta)
throws IOException {
String archiveFile = Paths.get(readerPath, SNAPSHOT_TAR).toString();
private String decompressSnapshot(SnapshotReader reader,
String snapshotDirTar)
throws IOException {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
if (meta == null) {
throw new IOException("Can't find snapshot archive file, path=" +
snapshotDirTar);
}

String diskTableKey = meta.getUserMeta().toStringUtf8();
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).getParent().toString();
String snapshotDir = Paths.get(parentPath,
StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
LOG.info("Delete stale snapshot dir {}", snapshotDir);

Checksum checksum = new CRC64();
CompressUtil.decompressTar(archiveFile, readerPath, checksum);
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar)
.toString();
CompressUtil.decompressTar(archiveFile, parentPath, checksum);
if (meta.hasChecksum()) {
String expected = meta.getChecksum();
String actual = Long.toHexString(checksum.getValue());
E.checkArgument(expected.equals(actual),
"Snapshot checksum error: '%s' != '%s'",
actual, expected);
}
return snapshotDir;
}

private void deleteSnapshotDirs(Set<String> snapshotDirs) {
for (String snapshotDir : snapshotDirs) {
FileUtils.deleteQuietly(new File(snapshotDir));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.LZ4Util;
import com.baidu.hugegraph.util.Log;

Expand Down Expand Up @@ -162,7 +163,9 @@ public void onApply(Iterator iter) {

private void applyCommand(StoreType type, StoreAction action,
BytesBuffer buffer, boolean forwarded) {
BackendStore store = type != StoreType.ALL ? this.store(type) : null;
E.checkState(type != StoreType.ALL,
"Can't apply command for all store at one time");
BackendStore store = this.store(type);
switch (action) {
case CLEAR:
boolean clearSpace = buffer.read() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ public String hardLinkSnapshot(String snapshotPath) throws RocksDBException {
snapshotPath, null).rocksdb) {
RocksDBStdSessions.createCheckpoint(rocksdb, snapshotLinkPath);
}
LOG.debug("The snapshot {} has been hard linked to {}",
snapshotPath, snapshotLinkPath);
LOG.info("The snapshot {} has been hard linked to {}",
snapshotPath, snapshotLinkPath);
return snapshotLinkPath;
}

Expand Down
Loading