From 5b9b3e78d704d9b3071bbdf71f6119e9d40910df Mon Sep 17 00:00:00 2001 From: liningrui Date: Thu, 29 Apr 2021 21:20:11 +0800 Subject: [PATCH 1/3] Remove decompressSnapshot operation when resume snapshot Change-Id: I974d9ec3d05eed02123191a8fe0916163fe59811 --- .../backend/store/raft/StoreSnapshotFile.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java index 30ac92a7b1..9638b2e3e1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -65,6 +65,11 @@ public void save(SnapshotWriter writer, Closure done, String jraftSnapshotPath = this.writeManifest(writer, snapshotDirs, done); + /* + * Compression must be performed, otherwise jraft will not + * load the snapshot on restart, even if we don't need this + * compressed file when actually loading the snapshot + */ this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done); }); } catch (Throwable e) { @@ -82,21 +87,13 @@ public boolean load(SnapshotReader reader) { LOG.error("Can't find snapshot archive file, path={}", readerPath); return false; } - String jraftSnapshotPath = Paths.get(readerPath, SNAPSHOT_DIR) - .toString(); try { - // Decompress manifest and data directory - this.decompressSnapshot(readerPath, meta); + /* + * Don't perform decompression, it's possible to trigger the bug of + * IOUtils.skip() infinite loop. I don't know how this bug is + * generated yet. + */ this.doSnapshotLoad(); - File tmp = new File(jraftSnapshotPath); - // Delete the decompressed temporary file. If the deletion fails - // (although it is a small probability event), it may affect the - // next snapshot decompression result. Therefore, the safest way - // is to terminate the state machine immediately. Users can choose - // to manually delete and restart according to the log information. - if (tmp.exists()) { - FileUtils.forceDelete(tmp); - } return true; } catch (Throwable e) { LOG.error("Failed to load snapshot", e); From 945371cee6a0517d04ec50b43e7801a76e192551 Mon Sep 17 00:00:00 2001 From: liningrui Date: Thu, 13 May 2021 17:22:06 +0800 Subject: [PATCH 2/3] Support newly added node install and resume snapshot Change-Id: I0bb904a967eaada26b51b9b47bbfde5d31653bd1 --- .../hugegraph/backend/store/BackendStore.java | 4 +- .../backend/store/raft/StoreSnapshotFile.java | 176 +++++++++++------- .../backend/store/raft/StoreStateMachine.java | 5 +- .../store/rocksdb/RocksDBStdSessions.java | 4 +- .../backend/store/rocksdb/RocksDBStore.java | 38 +++- 5 files changed, 144 insertions(+), 83 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java index 7c55da1a3f..bd1727790b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java @@ -20,7 +20,7 @@ package com.baidu.hugegraph.backend.store; import java.util.Iterator; -import java.util.Set; +import java.util.Map; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; @@ -117,7 +117,7 @@ public default void setCounterLowest(HugeType type, long lowest) { // Get current counter for a specific type public long getCounter(HugeType type); - public default Set createSnapshot(String snapshotDir) { + public default Map createSnapshot(String snapshotDir) { throw new UnsupportedOperationException("createSnapshot"); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java index 9638b2e3e1..f1f11a7d51 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -22,11 +22,15 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.zip.Checksum; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import com.alipay.sofa.jraft.Closure; @@ -36,10 +40,12 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.util.CRC64; +import com.baidu.hugegraph.testutil.Whitebox; import com.baidu.hugegraph.util.CompressUtil; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.Log; +import com.google.protobuf.ByteString; public class StoreSnapshotFile { @@ -47,30 +53,35 @@ public class StoreSnapshotFile { public static final String SNAPSHOT_DIR = "snapshot"; private static final String TAR = ".tar"; - private static final String SNAPSHOT_TAR = SNAPSHOT_DIR + TAR; - private static final String MANIFEST = "manifest"; private final RaftBackendStore[] stores; + private final Map dataDisks; public StoreSnapshotFile(RaftBackendStore[] stores) { this.stores = stores; + this.dataDisks = new HashMap<>(); + for (RaftBackendStore raftStore : stores) { + // Call RocksDBStore method reportDiskMapping() + this.dataDisks.putAll(Whitebox.invoke(raftStore, "store", + "reportDiskMapping")); + } + /* + * Like that: + * general=/parent_path/rocksdb-data + * g/VERTEX=/parent_path/rocksdb-vertex + */ + LOG.debug("The store data disks mapping {}", this.dataDisks); } public void save(SnapshotWriter writer, Closure done, ExecutorService executor) { try { // Write snapshot to real directory - Set snapshotDirs = this.doSnapshotSave(); + Map snapshotDirMaps = this.doSnapshotSave(); executor.execute(() -> { - String jraftSnapshotPath = this.writeManifest(writer, - snapshotDirs, - done); - /* - * Compression must be performed, otherwise jraft will not - * load the snapshot on restart, even if we don't need this - * compressed file when actually loading the snapshot - */ - this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done); + this.compressSnapshotDir(writer, snapshotDirMaps, done); + this.deleteSnapshotDirs(snapshotDirMaps.keySet()); + done.run(Status.OK()); }); } catch (Throwable e) { LOG.error("Failed to save snapshot", e); @@ -81,35 +92,38 @@ public void save(SnapshotWriter writer, Closure done, } public boolean load(SnapshotReader reader) { - LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_TAR); - String readerPath = reader.getPath(); - if (meta == null) { - LOG.error("Can't find snapshot archive file, path={}", readerPath); - return false; + Set snapshotDirTars = reader.listFiles(); + LOG.info("The snapshot tar files to be loaded are {}", snapshotDirTars); + Set snapshotDirs = new HashSet<>(); + for (String snapshotDirTar : snapshotDirTars) { + try { + String snapshotDir = this.decompressSnapshot(reader, + snapshotDirTar); + snapshotDirs.add(snapshotDir); + } catch (Throwable e) { + LOG.error("Failed to decompress snapshot tar", e); + return false; + } } + try { - /* - * Don't perform decompression, it's possible to trigger the bug of - * IOUtils.skip() infinite loop. I don't know how this bug is - * generated yet. - */ this.doSnapshotLoad(); - return true; + this.deleteSnapshotDirs(snapshotDirs); } catch (Throwable e) { LOG.error("Failed to load snapshot", e); return false; } + return true; } - private Set doSnapshotSave() { - Set snapshotDirs = InsertionOrderUtil.newSet(); + private Map doSnapshotSave() { + Map snapshotDirMaps = InsertionOrderUtil.newMap(); for (RaftBackendStore store : this.stores) { - Set snapshots = store.originStore() - .createSnapshot(SNAPSHOT_DIR); - snapshotDirs.addAll(snapshots); + snapshotDirMaps.putAll(store.originStore() + .createSnapshot(SNAPSHOT_DIR)); } - LOG.info("Saved all snapshots: {}", snapshotDirs); - return snapshotDirs; + LOG.info("Saved all snapshots: {}", snapshotDirMaps); + return snapshotDirMaps; } private void doSnapshotLoad() { @@ -118,55 +132,66 @@ private void doSnapshotLoad() { } } - private String writeManifest(SnapshotWriter writer, - Set snapshotFiles, - Closure done) { - String writerPath = writer.getPath(); - // Write all backend compressed snapshot file path to manifest - String jraftSnapshotPath = Paths.get(writerPath, SNAPSHOT_DIR) - .toString(); - File snapshotManifestFile = new File(jraftSnapshotPath, MANIFEST); - try { - FileUtils.writeLines(snapshotManifestFile, snapshotFiles); - } catch (IOException e) { - done.run(new Status(RaftError.EIO, - "Failed to write backend snapshot file path " + - "to manifest")); - } - return jraftSnapshotPath; - } - - private void compressJraftSnapshotDir(SnapshotWriter writer, - String jraftSnapshotPath, - Closure done) { + private void compressSnapshotDir(SnapshotWriter writer, + Map snapshotDirMaps, + Closure done) { String writerPath = writer.getPath(); - String outputFile = Paths.get(writerPath, SNAPSHOT_TAR).toString(); - try { - LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder(); - Checksum checksum = new CRC64(); - CompressUtil.compressTar(jraftSnapshotPath, outputFile, checksum); - metaBuilder.setChecksum(Long.toHexString(checksum.getValue())); - if (writer.addFile(SNAPSHOT_TAR, metaBuilder.build())) { - done.run(Status.OK()); - } else { + for (Map.Entry entry : snapshotDirMaps.entrySet()) { + String snapshotDir = entry.getKey(); + String hugeTypeKey = entry.getValue(); + String snapshotDirTar = Paths.get(snapshotDir).getFileName() + .toString() + TAR; + String outputFile = Paths.get(writerPath, snapshotDirTar) + .toString(); + try { + LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder(); + Checksum checksum = new CRC64(); + CompressUtil.compressTar(snapshotDir, outputFile, checksum); + metaBuilder.setChecksum(Long.toHexString(checksum.getValue())); + /* + * snapshot_rocksdb-data.tar -> general + * snapshot_rocksdb-vertex.tar -> g/VERTEX + */ + metaBuilder.setUserMeta(ByteString.copyFromUtf8(hugeTypeKey)); + if (!writer.addFile(snapshotDirTar, metaBuilder.build())) { + done.run(new Status(RaftError.EIO, + "Failed to add snapshot file: '%s'", + writerPath)); + } + } catch (Throwable e) { + LOG.error("Failed to compress snapshot, path={}, files={}, {}.", + writerPath, writer.listFiles(), e); done.run(new Status(RaftError.EIO, - "Failed to add snapshot file: '%s'", - writerPath)); + "Failed to compress snapshot '%s' due to: %s", + writerPath, e.getMessage())); } - } catch (Throwable e) { - LOG.error("Failed to compress snapshot, path={}, files={}, {}.", - writerPath, writer.listFiles(), e); - done.run(new Status(RaftError.EIO, - "Failed to compress snapshot '%s' due to: %s", - writerPath, e.getMessage())); } } - private void decompressSnapshot(String readerPath, LocalFileMeta meta) - throws IOException { - String archiveFile = Paths.get(readerPath, SNAPSHOT_TAR).toString(); + private String decompressSnapshot(SnapshotReader reader, + String snapshotDirTar) + throws IOException { + LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar); + if (meta == null) { + throw new IOException("Can't find snapshot archive file, path=" + + snapshotDirTar); + } + + String hugeTypeKey = meta.getUserMeta().toStringUtf8(); + E.checkArgument(this.dataDisks.containsKey(hugeTypeKey), + "The data path for '%s' should be exist", hugeTypeKey); + String dataPath = this.dataDisks.get(hugeTypeKey); + String parentPath = Paths.get(dataPath).getParent().toString(); + String snapshotDir = Paths.get(parentPath, + StringUtils.removeEnd(snapshotDirTar, TAR)) + .toString(); + FileUtils.deleteDirectory(new File(snapshotDir)); + LOG.info("Delete stale snapshot dir {}", snapshotDir); + Checksum checksum = new CRC64(); - CompressUtil.decompressTar(archiveFile, readerPath, checksum); + String archiveFile = Paths.get(reader.getPath(), snapshotDirTar) + .toString(); + CompressUtil.decompressTar(archiveFile, parentPath, checksum); if (meta.hasChecksum()) { String expected = meta.getChecksum(); String actual = Long.toHexString(checksum.getValue()); @@ -174,5 +199,12 @@ private void decompressSnapshot(String readerPath, LocalFileMeta meta) "Snapshot checksum error: '%s' != '%s'", actual, expected); } + return snapshotDir; + } + + private void deleteSnapshotDirs(Set snapshotDirs) { + for (String snapshotDir : snapshotDirs) { + FileUtils.deleteQuietly(new File(snapshotDir)); + } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 8d61e775a2..3506b6ca18 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -48,6 +48,7 @@ import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.GraphMode; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.LZ4Util; import com.baidu.hugegraph.util.Log; @@ -162,7 +163,9 @@ public void onApply(Iterator iter) { private void applyCommand(StoreType type, StoreAction action, BytesBuffer buffer, boolean forwarded) { - BackendStore store = type != StoreType.ALL ? this.store(type) : null; + E.checkState(type != StoreType.ALL, + "Can't apply command for all store at one time"); + BackendStore store = this.store(type); switch (action) { case CLEAR: boolean clearSpace = buffer.read() > 0; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index c8dbfbcbcb..f12ba4c127 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -306,8 +306,8 @@ public String hardLinkSnapshot(String snapshotPath) throws RocksDBException { snapshotPath, null).rocksdb) { RocksDBStdSessions.createCheckpoint(rocksdb, snapshotLinkPath); } - LOG.debug("The snapshot {} has been hard linked to {}", - snapshotPath, snapshotLinkPath); + LOG.info("The snapshot {} has been hard linked to {}", + snapshotPath, snapshotLinkPath); return snapshotLinkPath; } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index 9cd31e530c..ccb61af5c5 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -81,6 +81,7 @@ public abstract class RocksDBStore extends AbstractBackendStore { private final BackendStoreProvider provider; private final Map tables; + private String dataPath; private RocksDBSessions sessions; private final Map tableDiskMapping; // DataPath:RocksDB mapping @@ -166,6 +167,7 @@ public synchronized void open(HugeConfig config) { LOG.debug("Store open: {}", this.store); E.checkNotNull(config, "config"); + this.dataPath = config.get(RocksDBOptions.DATA_PATH); if (this.sessions != null && !this.sessions.closed()) { LOG.debug("Store {} has been opened before", this.store); @@ -185,8 +187,7 @@ public synchronized void open(HugeConfig config) { Map disks = config.getMap(RocksDBOptions.DATA_DISKS); Set openedDisks = new HashSet<>(); if (!disks.isEmpty()) { - String dataPath = config.get(RocksDBOptions.DATA_PATH); - this.parseTableDiskMapping(disks, dataPath); + this.parseTableDiskMapping(disks, this.dataPath); for (Entry e : this.tableDiskMapping.entrySet()) { String table = this.table(e.getKey()).table(); String disk = e.getValue(); @@ -607,11 +608,11 @@ protected Session session(HugeType tableType) { } @Override - public Set createSnapshot(String snapshotPrefix) { + public Map createSnapshot(String snapshotPrefix) { Lock readLock = this.storeLock.readLock(); readLock.lock(); try { - Set uniqueParents = new HashSet<>(); + Map uniqueSnapshotDirMaps = new HashMap<>(); // Every rocksdb instance should create an snapshot for (Map.Entry entry : this.dbs.entrySet()) { // Like: parent_path/rocksdb-data/*, * maybe g,m,s @@ -627,10 +628,13 @@ public Set createSnapshot(String snapshotPrefix) { RocksDBSessions sessions = entry.getValue(); sessions.createSnapshot(snapshotPath.toString()); - uniqueParents.add(snapshotPath.getParent().toString()); + String snapshotDir = snapshotPath.getParent().toString(); + // Find correspond data HugeType key + String hugeTypeKey = this.findHugeTypeKey(entry.getKey()); + uniqueSnapshotDirMaps.put(snapshotDir, hugeTypeKey); } LOG.info("The store '{}' create snapshot successfully", this); - return uniqueParents; + return uniqueSnapshotDirMaps; } finally { readLock.unlock(); } @@ -742,6 +746,28 @@ private final void parseTableDiskMapping(Map disks, } } + private Map reportDiskMapping() { + Map diskMapping = new HashMap<>(); + diskMapping.put("general", this.dataPath); + for (Map.Entry e : this.tableDiskMapping.entrySet()) { + String key = this.store + "/" + e.getKey().name(); + String value = Paths.get(e.getValue()).getParent().toString(); + diskMapping.put(key, value); + } + return diskMapping; + } + + private String findHugeTypeKey(String diskPath) { + String hugeTypeKey = "general"; + for (Map.Entry e : this.tableDiskMapping.entrySet()) { + if (diskPath.equals(e.getValue())) { + hugeTypeKey = this.store + "/" + e.getKey().name(); + break; + } + } + return hugeTypeKey; + } + private final void checkDbOpened() { E.checkState(this.sessions != null && !this.sessions.closed(), "RocksDB has not been opened"); From 55583901c2911b80c56acb7f68b1b6bc1f1f11ae Mon Sep 17 00:00:00 2001 From: liningrui Date: Tue, 18 May 2021 11:13:47 +0800 Subject: [PATCH 3/3] tiny improve Change-Id: I3b0117fb9eddf2f4775f17ef1a30775367a48020 --- .../backend/store/raft/StoreSnapshotFile.java | 60 ++++++++++--------- .../backend/store/rocksdb/RocksDBStore.java | 16 ++--- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java index f1f11a7d51..54a8b9c32b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -79,9 +79,16 @@ public void save(SnapshotWriter writer, Closure done, // Write snapshot to real directory Map snapshotDirMaps = this.doSnapshotSave(); executor.execute(() -> { - this.compressSnapshotDir(writer, snapshotDirMaps, done); - this.deleteSnapshotDirs(snapshotDirMaps.keySet()); - done.run(Status.OK()); + try { + this.compressSnapshotDir(writer, snapshotDirMaps); + this.deleteSnapshotDirs(snapshotDirMaps.keySet()); + done.run(Status.OK()); + } catch (Throwable e) { + LOG.error("Failed to compress snapshot", e); + done.run(new Status(RaftError.EIO, + "Failed to compress snapshot, " + + "error is %s", e.getMessage())); + } }); } catch (Throwable e) { LOG.error("Failed to save snapshot", e); @@ -133,37 +140,34 @@ private void doSnapshotLoad() { } private void compressSnapshotDir(SnapshotWriter writer, - Map snapshotDirMaps, - Closure done) { + Map snapshotDirMaps) { String writerPath = writer.getPath(); for (Map.Entry entry : snapshotDirMaps.entrySet()) { String snapshotDir = entry.getKey(); - String hugeTypeKey = entry.getValue(); + String diskTableKey = entry.getValue(); String snapshotDirTar = Paths.get(snapshotDir).getFileName() .toString() + TAR; String outputFile = Paths.get(writerPath, snapshotDirTar) .toString(); + Checksum checksum = new CRC64(); try { - LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder(); - Checksum checksum = new CRC64(); CompressUtil.compressTar(snapshotDir, outputFile, checksum); - metaBuilder.setChecksum(Long.toHexString(checksum.getValue())); - /* - * snapshot_rocksdb-data.tar -> general - * snapshot_rocksdb-vertex.tar -> g/VERTEX - */ - metaBuilder.setUserMeta(ByteString.copyFromUtf8(hugeTypeKey)); - if (!writer.addFile(snapshotDirTar, metaBuilder.build())) { - done.run(new Status(RaftError.EIO, - "Failed to add snapshot file: '%s'", - writerPath)); - } } catch (Throwable e) { - LOG.error("Failed to compress snapshot, path={}, files={}, {}.", - writerPath, writer.listFiles(), e); - done.run(new Status(RaftError.EIO, - "Failed to compress snapshot '%s' due to: %s", - writerPath, e.getMessage())); + throw new RaftException( + "Failed to compress snapshot, path=%s, files=%s", + e, writerPath, snapshotDirMaps.keySet()); + } + + LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder(); + metaBuilder.setChecksum(Long.toHexString(checksum.getValue())); + /* + * snapshot_rocksdb-data.tar -> general + * snapshot_rocksdb-vertex.tar -> g/VERTEX + */ + metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey)); + if (!writer.addFile(snapshotDirTar, metaBuilder.build())) { + throw new RaftException("Failed to add snapshot file: '%s'", + snapshotDirTar); } } } @@ -177,10 +181,10 @@ private String decompressSnapshot(SnapshotReader reader, snapshotDirTar); } - String hugeTypeKey = meta.getUserMeta().toStringUtf8(); - E.checkArgument(this.dataDisks.containsKey(hugeTypeKey), - "The data path for '%s' should be exist", hugeTypeKey); - String dataPath = this.dataDisks.get(hugeTypeKey); + String diskTableKey = meta.getUserMeta().toStringUtf8(); + E.checkArgument(this.dataDisks.containsKey(diskTableKey), + "The data path for '%s' should be exist", diskTableKey); + String dataPath = this.dataDisks.get(diskTableKey); String parentPath = Paths.get(dataPath).getParent().toString(); String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR)) diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index ccb61af5c5..d470ec4e6e 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -88,6 +88,7 @@ public abstract class RocksDBStore extends AbstractBackendStore { private final ConcurrentMap dbs; private final ReadWriteLock storeLock; + private static final String TABLE_GENERAL_KEY = "general"; private static final String DB_OPEN = "db-open-%s"; private static final long OPEN_TIMEOUT = 600L; /* @@ -630,8 +631,9 @@ public Map createSnapshot(String snapshotPrefix) { String snapshotDir = snapshotPath.getParent().toString(); // Find correspond data HugeType key - String hugeTypeKey = this.findHugeTypeKey(entry.getKey()); - uniqueSnapshotDirMaps.put(snapshotDir, hugeTypeKey); + String diskTableKey = this.findDiskTableKeyByPath( + entry.getKey()); + uniqueSnapshotDirMaps.put(snapshotDir, diskTableKey); } LOG.info("The store '{}' create snapshot successfully", this); return uniqueSnapshotDirMaps; @@ -748,7 +750,7 @@ private final void parseTableDiskMapping(Map disks, private Map reportDiskMapping() { Map diskMapping = new HashMap<>(); - diskMapping.put("general", this.dataPath); + diskMapping.put(TABLE_GENERAL_KEY, this.dataPath); for (Map.Entry e : this.tableDiskMapping.entrySet()) { String key = this.store + "/" + e.getKey().name(); String value = Paths.get(e.getValue()).getParent().toString(); @@ -757,15 +759,15 @@ private Map reportDiskMapping() { return diskMapping; } - private String findHugeTypeKey(String diskPath) { - String hugeTypeKey = "general"; + private String findDiskTableKeyByPath(String diskPath) { + String diskTableKey = TABLE_GENERAL_KEY; for (Map.Entry e : this.tableDiskMapping.entrySet()) { if (diskPath.equals(e.getValue())) { - hugeTypeKey = this.store + "/" + e.getKey().name(); + diskTableKey = this.store + "/" + e.getKey().name(); break; } } - return hugeTypeKey; + return diskTableKey; } private final void checkDbOpened() {