Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let snapshot saved disk same as rocksdb data path #1392

Merged
merged 13 commits into from
Apr 1, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@

package com.baidu.hugegraph.backend.store;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeGraph;
Expand Down Expand Up @@ -160,28 +155,15 @@ public void initSystemInfo(HugeGraph graph) {
public void createSnapshot() {
String snapshotPrefix = StoreSnapshotFile.SNAPSHOT_DIR;
for (BackendStore store : this.stores.values()) {
if (store.features().supportsSnapshot()) {
store.createSnapshot(snapshotPrefix);
}
store.createSnapshot(snapshotPrefix);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer call store.writeSnapshot() through tx, and remove writeSnapshot() from AbstractBackendStoreProvider class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only memory backend will share store provider

}

@Override
public void resumeSnapshot() {
String snapshotPrefix = StoreSnapshotFile.SNAPSHOT_DIR;
Set<String> snapshotDirs = new HashSet<>();
for (BackendStore store : this.stores.values()) {
if (store.features().supportsSnapshot()) {
snapshotDirs.addAll(store.resumeSnapshot(snapshotPrefix));
}
}
// Delete all snapshot parent directories
for (String snapshotDir : snapshotDirs) {
try {
FileUtils.deleteDirectory(new File(snapshotDir));
} catch (IOException e) {
LOG.warn("Failed to delete snapshot directory {}", snapshotDir);
}
store.resumeSnapshot(snapshotPrefix, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public default Set<String> createSnapshot(String snapshotDir) {
throw new UnsupportedOperationException("createSnapshot");
}

public default Set<String> resumeSnapshot(String snapshotDir) {
public default void resumeSnapshot(String snapshotDir,
boolean deleteSnapshot) {
throw new UnsupportedOperationException("resumeSnapshot");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ public void save(SnapshotWriter writer, Closure done,
// Write snapshot to real directory
Set<String> snapshotDirs = this.doSnapshotSave();
executor.execute(() -> {
long begin = System.currentTimeMillis();
String jraftSnapshotPath =
this.writeManifest(writer, snapshotDirs, done);
String jraftSnapshotPath = this.writeManifest(writer,
snapshotDirs,
done);
this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done);
LOG.info("Compress snapshot cost {}ms",
System.currentTimeMillis() - begin);
});
} catch (Throwable t) {
LOG.error("Failed to save snapshot", t);
Expand Down Expand Up @@ -118,7 +116,7 @@ private Set<String> doSnapshotSave() {

private void doSnapshotLoad() {
for (RaftBackendStore store : this.stores) {
store.originStore().resumeSnapshot(SNAPSHOT_DIR);
store.originStore().resumeSnapshot(SNAPSHOT_DIR, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package com.baidu.hugegraph.backend.store.rocksdb;

import java.nio.file.Path;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
Expand All @@ -48,12 +48,16 @@ public RocksDBSessions(HugeConfig config, String database, String store) {
public abstract RocksDBSessions copy(HugeConfig config,
String database, String store);

public abstract RocksDB createSnapshotRocksDB(String snapshotPath)
throws RocksDBException;

public abstract void createSnapshot(String parentPath);

public abstract void reload() throws RocksDBException;
public abstract void resumeSnapshot(String snapshotPath);

public abstract Path buildSnapshotPath(Path originDataPath,
String snapshotPrefix,
boolean deleteSnapshot)
throws RocksDBException;

public abstract void reloadRocksDB() throws RocksDBException;

public abstract void forceCloseRocksDB();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package com.baidu.hugegraph.backend.store.rocksdb;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -33,9 +35,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
Expand Down Expand Up @@ -252,7 +256,7 @@ public synchronized void dropTable(String... tables)
}

@Override
public void reload() throws RocksDBException {
public void reloadRocksDB() throws RocksDBException {
if (this.rocksdb.isOwningHandle()) {
this.rocksdb.close();
}
Expand Down Expand Up @@ -320,33 +324,72 @@ public RocksDBSessions copy(HugeConfig config,
}

@Override
public RocksDB createSnapshotRocksDB(String snapshotPath)
throws RocksDBException {
// Init CFs options
Set<String> mergedCFs = this.mergeOldCFs(snapshotPath, new ArrayList<>(
this.cfs.keySet()));
List<String> cfNames = ImmutableList.copyOf(mergedCFs);

List<ColumnFamilyDescriptor> cfds = new ArrayList<>(cfNames.size());
for (String cf : cfNames) {
ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf));
ColumnFamilyOptions options = cfd.getOptions();
RocksDBStdSessions.initOptions(this.config, null, null,
options, options);
cfds.add(cfd);
public Path buildSnapshotPath(Path originDataPath, String snapshotPrefix,
boolean deleteSnapshot)
throws RocksDBException {
// originDataPath
Linary marked this conversation as resolved.
Show resolved Hide resolved
// Like: parent_path/rocksdb-data/m
// parent_path/rocksdb-vertex/g
Linary marked this conversation as resolved.
Show resolved Hide resolved
Path parentParentPath = originDataPath.getParent().getParent();
// Like: rocksdb-data/m
// rocksdb-vertex/g
Path pureDataPath = parentParentPath.relativize(originDataPath);
// Like: parent_path/snapshot_rocksdb-data/m
// parent_path/snapshot_rocksdb-vertex/g
Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" +
pureDataPath);
E.checkState(snapshotPath.toFile().exists(),
"The snapshot path '%s' doesn't exist",
snapshotPath);
LOG.debug("The origin data path: {}", originDataPath);
if (deleteSnapshot) {
LOG.debug("The snapshot data path: {}", snapshotPath);
return snapshotPath;
}
List<ColumnFamilyHandle> cfhs = new ArrayList<>();

// Init DB options
DBOptions options = new DBOptions();
RocksDBStdSessions.initOptions(this.config, options, options,
null, null);
return RocksDB.open(options, snapshotPath, cfds, cfhs);
RocksDB rocksdb = this.createSnapshotRocksDB(snapshotPath.toString());
Linary marked this conversation as resolved.
Show resolved Hide resolved
Path snapshotLinkPath = Paths.get(originDataPath + "_link");
try {
createCheckpoint(rocksdb, snapshotLinkPath.toString());
Linary marked this conversation as resolved.
Show resolved Hide resolved
} finally {
rocksdb.close();
}
LOG.debug("The snapshot data link path: {}", snapshotLinkPath);
return snapshotLinkPath;
}

@Override
public void createSnapshot(String snapshotPath) {
RocksDBStore.createCheckpoint(this.rocksdb, snapshotPath);
createCheckpoint(this.rocksdb, snapshotPath);
Linary marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void resumeSnapshot(String snapshotPath) {
File originDataDir = new File(this.dataPath);
File snapshotDir = new File(snapshotPath);
try {
/*
* Close current instance first
* NOTE: must close rocksdb instance before deleting file directory,
* if close after copying the snapshot directory to origin position,
* it may produce dirty data.
*/
this.forceCloseRocksDB();
// Delete origin data directory
if (originDataDir.exists()) {
LOG.info("Delete origin data directory {}", originDataDir);
FileUtils.deleteDirectory(originDataDir);
}
// Move snapshot directory to origin data directory
FileUtils.moveDirectory(snapshotDir, originDataDir);
LOG.info("Move snapshot directory {} to {}",
snapshotDir, originDataDir);
// Reload rocksdb instance
this.reloadRocksDB();
} catch (Exception e) {
throw new BackendException("Failed to resume snapshot '%s' to' %s'",
e, snapshotDir, this.dataPath);
}
}

@Override
Expand Down Expand Up @@ -421,6 +464,30 @@ private void ingestExternalFile() throws RocksDBException {
}
}

private RocksDB createSnapshotRocksDB(String snapshotPath)
Linary marked this conversation as resolved.
Show resolved Hide resolved
throws RocksDBException {
// Init CFs options
Set<String> mergedCFs = this.mergeOldCFs(snapshotPath, new ArrayList<>(
this.cfs.keySet()));
List<String> cfNames = ImmutableList.copyOf(mergedCFs);

List<ColumnFamilyDescriptor> cfds = new ArrayList<>(cfNames.size());
for (String cf : cfNames) {
ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf));
ColumnFamilyOptions options = cfd.getOptions();
RocksDBStdSessions.initOptions(this.config, null, null,
options, options);
cfds.add(cfd);
}
List<ColumnFamilyHandle> cfhs = new ArrayList<>();

// Init DB options
DBOptions options = new DBOptions();
RocksDBStdSessions.initOptions(this.config, options, options,
null, null);
return RocksDB.open(options, snapshotPath, cfds, cfhs);
}

public static Set<String> listCFs(String path) throws RocksDBException {
Set<String> cfs = new HashSet<>();

Expand Down Expand Up @@ -626,6 +693,29 @@ public static final String decode(byte[] bytes) {
return StringEncoding.decode(bytes);
}

public static void createCheckpoint(RocksDB rocksdb, String targetPath) {
// https://github.com/facebook/rocksdb/wiki/Checkpoints
try (Checkpoint checkpoint = Checkpoint.create(rocksdb)) {
String tempPath = targetPath + "_temp";
File tempFile = new File(tempPath);
FileUtils.deleteDirectory(tempFile);
LOG.debug("Deleted temp directory {}", tempFile);

FileUtils.forceMkdir(tempFile.getParentFile());
checkpoint.createCheckpoint(tempPath);
File snapshotFile = new File(targetPath);
FileUtils.deleteDirectory(snapshotFile);
LOG.debug("Deleted stale directory {}", snapshotFile);
if (!tempFile.renameTo(snapshotFile)) {
throw new IOException(String.format("Failed to rename %s to %s",
tempFile, snapshotFile));
}
} catch (Exception e) {
throw new BackendException("Failed to create checkpoint at path %s",
e, targetPath);
}
}

private class CFHandle implements Closeable {

private final ColumnFamilyHandle handle;
Expand Down
Loading