From 699d03be0fdf193c85aa19e5f96c27d46e0635aa Mon Sep 17 00:00:00 2001 From: liningrui Date: Fri, 26 Mar 2021 15:59:43 +0800 Subject: [PATCH] Refactor some code in RocksDBStdSessions Change-Id: I0561be1a71322644465e745ded9ca1636bff078a --- .../store/rocksdb/RocksDBSessions.java | 10 +- .../store/rocksdb/RocksDBStdSessions.java | 349 +++++++++--------- .../backend/store/rocksdb/RocksDBStore.java | 20 +- .../store/rocksdbsst/RocksDBSstSessions.java | 13 +- 4 files changed, 201 insertions(+), 191 deletions(-) diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java index 7172635e56..e92f358a4d 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java @@ -19,13 +19,13 @@ package com.baidu.hugegraph.backend.store.rocksdb; -import java.nio.file.Path; import java.util.List; import java.util.Set; import org.apache.commons.lang3.tuple.Pair; import org.rocksdb.RocksDBException; +import com.alipay.sofa.jraft.storage.snapshot.remote.Session; import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator; import com.baidu.hugegraph.backend.store.BackendSession.AbstractBackendSession; import com.baidu.hugegraph.backend.store.BackendSessionPool; @@ -52,10 +52,10 @@ public abstract RocksDBSessions copy(HugeConfig config, public abstract void resumeSnapshot(String snapshotPath); - public abstract Path buildSnapshotPath(Path originDataPath, - String snapshotPrefix, - boolean deleteSnapshot) - throws RocksDBException; + public abstract String buildSnapshotPath(String snapshotPrefix); + + public abstract String hardLinkSnapshot(String snapshotPath) + throws RocksDBException; public abstract void reloadRocksDB() throws RocksDBException; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index e4026bc296..586d9b0aca 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -82,10 +82,7 @@ public class RocksDBStdSessions extends RocksDBSessions { private final String dataPath; private final String walPath; - private volatile RocksDB rocksdb; - private final SstFileManager sstFileManager; - - private final Map cfs; + private volatile OpenedRocksDB rocksdb; private final AtomicInteger refCount; public RocksDBStdSessions(HugeConfig config, String database, String store, @@ -95,22 +92,8 @@ public RocksDBStdSessions(HugeConfig config, String database, String store, this.config = config; this.dataPath = dataPath; this.walPath = walPath; - // Init options - Options options = new Options(); - RocksDBStdSessions.initOptions(config, options, options, - options, options); - options.setWalDir(walPath); - - this.sstFileManager = new SstFileManager(Env.getDefault()); - options.setSstFileManager(this.sstFileManager); - - /* - * Open RocksDB at the first time - * Don't merge old CFs, we expect a clear DB when using this one - */ - this.rocksdb = RocksDB.open(options, dataPath); - - this.cfs = new ConcurrentHashMap<>(); + this.rocksdb = RocksDBStdSessions.openRocksDB(config, dataPath, + walPath); this.refCount = new AtomicInteger(1); } @@ -121,43 +104,11 @@ public RocksDBStdSessions(HugeConfig config, String database, String store, this.config = config; this.dataPath = dataPath; this.walPath = walPath; - // Old CFs should always be opened - Set mergedCFs = this.mergeOldCFs(dataPath, cfNames); - List cfs = ImmutableList.copyOf(mergedCFs); - - // Init CFs options - List cfds = new ArrayList<>(cfs.size()); - for (String cf : cfs) { - ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf)); - ColumnFamilyOptions options = cfd.getOptions(); - RocksDBStdSessions.initOptions(config, null, null, - options, options); - cfds.add(cfd); - } - - // Init DB options - DBOptions options = new DBOptions(); - RocksDBStdSessions.initOptions(config, options, options, null, null); - options.setWalDir(walPath); - - this.sstFileManager = new SstFileManager(Env.getDefault()); - options.setSstFileManager(this.sstFileManager); - - // Open RocksDB with CFs - List cfhs = new ArrayList<>(); - this.rocksdb = RocksDB.open(options, dataPath, cfds, cfhs); - E.checkState(cfhs.size() == cfs.size(), - "Expect same size of cf-handles and cf-names"); - - // Collect CF Handles - this.cfs = new ConcurrentHashMap<>(); - for (int i = 0; i < cfs.size(); i++) { - this.cfs.put(cfs.get(i), new CFHandle(cfhs.get(i))); - } - + this.rocksdb = RocksDBStdSessions.openRocksDB(config, cfNames, + dataPath, walPath); this.refCount = new AtomicInteger(1); - ingestExternalFile(); + this.ingestExternalFile(); } private RocksDBStdSessions(HugeConfig config, String database, String store, @@ -167,10 +118,7 @@ private RocksDBStdSessions(HugeConfig config, String database, String store, this.dataPath = origin.dataPath; this.walPath = origin.walPath; this.rocksdb = origin.rocksdb; - this.sstFileManager = origin.sstFileManager; - this.cfs = origin.cfs; this.refCount = origin.refCount; - this.refCount.incrementAndGet(); } @@ -181,12 +129,12 @@ public void open() throws Exception { @Override protected boolean opened() { - return this.rocksdb != null && this.rocksdb.isOwningHandle(); + return this.rocksdb != null && this.rocksdb.rocksdb.isOwningHandle(); } @Override public Set openedTables() { - return this.cfs.keySet(); + return this.rocksdb.cfs(); } @Override @@ -196,7 +144,7 @@ public synchronized void createTable(String... tables) List cfds = new ArrayList<>(); for (String table : tables) { - if (this.cfs.containsKey(table)) { + if (this.rocksdb.existCf(table)) { continue; } ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor( @@ -210,14 +158,14 @@ public synchronized void createTable(String... tables) * To speed up the creation of tables, like truncate() for tinkerpop * test, we call createColumnFamilies instead of createColumnFamily. */ - List cfhs = this.rocksdb.createColumnFamilies(cfds); + List cfhs = this.rocksdb().createColumnFamilies(cfds); for (ColumnFamilyHandle cfh : cfhs) { String table = decode(cfh.getName()); - this.cfs.put(table, new CFHandle(cfh)); + this.rocksdb.addCf(table, new CFHandle(cfh)); } - ingestExternalFile(); + this.ingestExternalFile(); } @Override @@ -232,7 +180,7 @@ public synchronized void dropTable(String... tables) */ List cfhs = new ArrayList<>(); for (String table : tables) { - CFHandle cfh = this.cfs.get(table); + CFHandle cfh = this.rocksdb.cf(table); if (cfh == null) { continue; } @@ -243,49 +191,32 @@ public synchronized void dropTable(String... tables) * To speed up the creation of tables, like truncate() for tinkerpop * test, we call dropColumnFamilies instead of dropColumnFamily. */ - this.rocksdb.dropColumnFamilies(cfhs); + this.rocksdb().dropColumnFamilies(cfhs); for (String table : tables) { - CFHandle cfh = this.cfs.get(table); + CFHandle cfh = this.rocksdb.cf(table); if (cfh == null) { continue; } cfh.destroy(); - this.cfs.remove(table); + this.rocksdb.removeCf(table); } } + @Override + public boolean existsTable(String table) { + return this.rocksdb.existCf(table); + } + @Override public void reloadRocksDB() throws RocksDBException { if (this.rocksdb.isOwningHandle()) { this.rocksdb.close(); } - this.cfs.values().forEach(CFHandle::destroy); - // Init CFs options - Set mergedCFs = this.mergeOldCFs(this.dataPath, new ArrayList<>( - this.cfs.keySet())); - List cfNames = ImmutableList.copyOf(mergedCFs); - - List cfds = new ArrayList<>(cfNames.size()); - for (String cf : cfNames) { - ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf)); - ColumnFamilyOptions options = cfd.getOptions(); - RocksDBStdSessions.initOptions(this.config, null, null, - options, options); - cfds.add(cfd); - } - List cfhs = new ArrayList<>(); - - // Init DB options - DBOptions options = new DBOptions(); - RocksDBStdSessions.initOptions(this.config, options, options, - null, null); - options.setWalDir(this.walPath); - options.setSstFileManager(this.sstFileManager); - this.rocksdb = RocksDB.open(options, this.dataPath, cfds, cfhs); - for (int i = 0; i < cfNames.size(); i++) { - this.cfs.put(cfNames.get(i), new CFHandle(cfhs.get(i))); - } + this.rocksdb = RocksDBStdSessions.openRocksDB(this.config, + ImmutableList.of(), + this.dataPath, + this.walPath); } @Override @@ -293,22 +224,17 @@ public void forceCloseRocksDB() { this.rocksdb().close(); } - @Override - public boolean existsTable(String table) { - return this.cfs.containsKey(table); - } - @Override public List property(String property) { try { if (property.equals(RocksDBMetrics.DISK_USAGE)) { - long size = this.sstFileManager.getTotalSize(); + long size = this.rocksdb.sstFileManager.getTotalSize(); return ImmutableList.of(String.valueOf(size)); } List values = new ArrayList<>(); for (String cf : this.openedTables()) { - try (CFHandle cfh = cf(cf)) { - values.add(rocksdb().getProperty(cfh.get(), property)); + try (CFHandle cfh = this.cf(cf)) { + values.add(this.rocksdb().getProperty(cfh.get(), property)); } } return values; @@ -323,44 +249,9 @@ public RocksDBSessions copy(HugeConfig config, return new RocksDBStdSessions(config, database, store, this); } - @Override - public Path buildSnapshotPath(Path originDataPath, String snapshotPrefix, - boolean deleteSnapshot) - throws RocksDBException { - // originDataPath - // Like: parent_path/rocksdb-data/m - // parent_path/rocksdb-vertex/g - Path parentParentPath = originDataPath.getParent().getParent(); - // Like: rocksdb-data/m - // rocksdb-vertex/g - Path pureDataPath = parentParentPath.relativize(originDataPath); - // Like: parent_path/snapshot_rocksdb-data/m - // parent_path/snapshot_rocksdb-vertex/g - Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" + - pureDataPath); - E.checkState(snapshotPath.toFile().exists(), - "The snapshot path '%s' doesn't exist", - snapshotPath); - LOG.debug("The origin data path: {}", originDataPath); - if (deleteSnapshot) { - LOG.debug("The snapshot data path: {}", snapshotPath); - return snapshotPath; - } - - RocksDB rocksdb = this.createSnapshotRocksDB(snapshotPath.toString()); - Path snapshotLinkPath = Paths.get(originDataPath + "_link"); - try { - this.createCheckpoint(rocksdb, snapshotLinkPath.toString()); - } finally { - rocksdb.close(); - } - LOG.debug("The snapshot data link path: {}", snapshotLinkPath); - return snapshotLinkPath; - } - @Override public void createSnapshot(String snapshotPath) { - this.createCheckpoint(this.rocksdb, snapshotPath); + RocksDBStdSessions.createCheckpoint(this.rocksdb(), snapshotPath); } @Override @@ -392,6 +283,37 @@ public void resumeSnapshot(String snapshotPath) { } } + @Override + public String buildSnapshotPath(String snapshotPrefix) { + Path originDataPath = Paths.get(this.dataPath); + // originDataPath + // Like: parent_path/rocksdb-data/m + // parent_path/rocksdb-vertex/g + Path parentParentPath = originDataPath.getParent().getParent(); + // Like: rocksdb-data/m + // rocksdb-vertex/g + Path pureDataPath = parentParentPath.relativize(originDataPath); + // Like: parent_path/snapshot_rocksdb-data/m + // parent_path/snapshot_rocksdb-vertex/g + Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" + + pureDataPath); + E.checkState(snapshotPath.toFile().exists(), + "The snapshot path '%s' doesn't exist", + snapshotPath); + return snapshotPath.toString(); + } + + @Override + public String hardLinkSnapshot(String snapshotPath) throws RocksDBException { + String snapshotLinkPath = this.dataPath + "_link"; + try (RocksDB rocksdb = openRocksDB(this.config, ImmutableList.of(), + snapshotPath, null).rocksdb) { + RocksDBStdSessions.createCheckpoint(rocksdb, snapshotLinkPath); + } + LOG.debug("The snapshot data link path: {}", snapshotLinkPath); + return snapshotLinkPath; + } + @Override public final Session session() { return (Session) super.getOrNewSession(); @@ -412,12 +334,6 @@ protected synchronized void doClose() { return; } assert this.refCount.get() == 0; - - for (CFHandle cf : this.cfs.values()) { - cf.close(); - } - this.cfs.clear(); - this.rocksdb.close(); } @@ -428,67 +344,104 @@ private void checkValid() { private RocksDB rocksdb() { this.checkValid(); - return this.rocksdb; + return this.rocksdb.rocksdb; } - private CFHandle cf(String cf) { - CFHandle cfh = this.cfs.get(cf); + private CFHandle cf(String cfName) { + CFHandle cfh = this.rocksdb.cf(cfName); if (cfh == null) { - throw new BackendException("Table '%s' is not opened", cf); + throw new BackendException("Table '%s' is not opened", cfName); } cfh.open(); return cfh; } - private Set mergeOldCFs(String path, List cfNames) - throws RocksDBException { - Set cfs = listCFs(path); - cfs.addAll(cfNames); - return cfs; - } - private void ingestExternalFile() throws RocksDBException { String directory = this.config().get(RocksDBOptions.SST_PATH); if (directory == null || directory.isEmpty()) { return; } - RocksDBIngester ingester = new RocksDBIngester(this.rocksdb); + RocksDBIngester ingester = new RocksDBIngester(this.rocksdb()); // Ingest all *.sst files in `directory` - for (String cf : this.cfs.keySet()) { + for (String cf : this.rocksdb.cfs()) { Path path = Paths.get(directory, cf); if (path.toFile().isDirectory()) { - try (CFHandle cfh = cf(cf)) { + try (CFHandle cfh = this.cf(cf)) { ingester.ingest(path, cfh.get()); } } } } - private RocksDB createSnapshotRocksDB(String snapshotPath) - throws RocksDBException { - // Init CFs options - Set mergedCFs = this.mergeOldCFs(snapshotPath, new ArrayList<>( - this.cfs.keySet())); - List cfNames = ImmutableList.copyOf(mergedCFs); + private static OpenedRocksDB openRocksDB(HugeConfig config, + String dataPath, String walPath) + throws RocksDBException { + // Init options + Options options = new Options(); + RocksDBStdSessions.initOptions(config, options, options, + options, options); + options.setWalDir(walPath); + SstFileManager sstFileManager = new SstFileManager(Env.getDefault()); + options.setSstFileManager(sstFileManager); + /* + * Open RocksDB at the first time + * Don't merge old CFs, we expect a clear DB when using this one + */ + RocksDB rocksdb = RocksDB.open(options, dataPath); + Map cfs = new ConcurrentHashMap<>(); + return new OpenedRocksDB(rocksdb, cfs, sstFileManager); + } - List cfds = new ArrayList<>(cfNames.size()); - for (String cf : cfNames) { + private static OpenedRocksDB openRocksDB(HugeConfig config, + List cfNames, + String dataPath, String walPath) + throws RocksDBException { + // Old CFs should always be opened + Set mergedCFs = RocksDBStdSessions.mergeOldCFs(dataPath, + cfNames); + List cfs = ImmutableList.copyOf(mergedCFs); + + // Init CFs options + List cfds = new ArrayList<>(cfs.size()); + for (String cf : cfs) { ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf)); ColumnFamilyOptions options = cfd.getOptions(); - RocksDBStdSessions.initOptions(this.config, null, null, + RocksDBStdSessions.initOptions(config, null, null, options, options); cfds.add(cfd); } - List cfhs = new ArrayList<>(); // Init DB options DBOptions options = new DBOptions(); - RocksDBStdSessions.initOptions(this.config, options, options, - null, null); - return RocksDB.open(options, snapshotPath, cfds, cfhs); + RocksDBStdSessions.initOptions(config, options, options, null, null); + if (walPath != null) { + options.setWalDir(walPath); + } + SstFileManager sstFileManager = new SstFileManager(Env.getDefault()); + options.setSstFileManager(sstFileManager); + + // Open RocksDB with CFs + List cfhs = new ArrayList<>(); + RocksDB rocksdb = RocksDB.open(options, dataPath, cfds, cfhs); + + E.checkState(cfhs.size() == cfs.size(), + "Expect same size of cf-handles and cf-names"); + // Collect CF Handles + Map cfHandles = new ConcurrentHashMap<>(); + for (int i = 0; i < cfs.size(); i++) { + cfHandles.put(cfs.get(i), new CFHandle(cfhs.get(i))); + } + return new OpenedRocksDB(rocksdb, cfHandles, sstFileManager); } - private void createCheckpoint(RocksDB rocksdb, String targetPath) { + private static Set mergeOldCFs(String path, List cfNames) + throws RocksDBException { + Set cfs = listCFs(path); + cfs.addAll(cfNames); + return cfs; + } + + private static void createCheckpoint(RocksDB rocksdb, String targetPath) { // https://github.com/facebook/rocksdb/wiki/Checkpoints try (Checkpoint checkpoint = Checkpoint.create(rocksdb)) { String tempPath = targetPath + "_temp"; @@ -716,7 +669,57 @@ public static final String decode(byte[] bytes) { return StringEncoding.decode(bytes); } - private class CFHandle implements Closeable { + private static class OpenedRocksDB { + + private final RocksDB rocksdb; + private final Map cfHandles; + private final SstFileManager sstFileManager; + + public OpenedRocksDB(RocksDB rocksdb, Map cfHandles, + SstFileManager sstFileManager) { + this.rocksdb = rocksdb; + this.cfHandles = cfHandles; + this.sstFileManager = sstFileManager; + } + + public Set cfs() { + return this.cfHandles.keySet(); + } + + public CFHandle cf(String cfName) { + return this.cfHandles.get(cfName); + } + + public void addCf(String cfName, CFHandle cfHandle) { + this.cfHandles.put(cfName, cfHandle); + } + + public CFHandle removeCf(String cfName) { + return this.cfHandles.remove(cfName); + } + + public boolean existCf(String cfName) { + return this.cfHandles.containsKey(cfName); + } + + public boolean isOwningHandle() { + return this.rocksdb.isOwningHandle(); + } + + public void close() { + if (!this.isOwningHandle()) { + return; + } + for (CFHandle cf : this.cfHandles.values()) { + cf.close(); + } + this.cfHandles.clear(); + + this.rocksdb.close(); + } + } + + private static class CFHandle implements Closeable { private final ColumnFamilyHandle handle; private final AtomicInteger refs; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index 544baa911f..380c3ad968 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -647,25 +647,27 @@ public void resumeSnapshot(String snapshotPrefix, boolean deleteSnapshot) { if (!this.opened()) { return; } - Map snapshotPaths = new HashMap<>(); + Map snapshotPaths = new HashMap<>(); for (Map.Entry entry : this.dbs.entrySet()) { - Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath(); RocksDBSessions sessions = entry.getValue(); - Path snapshotPath = sessions.buildSnapshotPath(originDataPath, - snapshotPrefix, - deleteSnapshot); + String snapshotPath = sessions.buildSnapshotPath(snapshotPrefix); + LOG.debug("The origin data path: {}", entry.getKey()); + if (!deleteSnapshot) { + snapshotPath = sessions.hardLinkSnapshot(snapshotPath); + } + LOG.debug("The snapshot data path: {}", snapshotPath); snapshotPaths.put(snapshotPath, sessions); } - for (Map.Entry entry : + for (Map.Entry entry : snapshotPaths.entrySet()) { - Path snapshotPath = entry.getKey(); + String snapshotPath = entry.getKey(); RocksDBSessions sessions = entry.getValue(); - sessions.resumeSnapshot(snapshotPath.toString()); + sessions.resumeSnapshot(snapshotPath); if (deleteSnapshot) { // Delete empty snapshot parent directory - Path parentPath = snapshotPath.getParent(); + Path parentPath = Paths.get(snapshotPath).getParent(); if (Files.list(parentPath).count() == 0) { FileUtils.deleteDirectory(parentPath.toFile()); } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index 4a5a97ae7d..b1b222cf9f 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -154,14 +154,19 @@ public void resumeSnapshot(String snapshotPath) { } @Override - public Path buildSnapshotPath(Path originDataPath, String snapshotPrefix, - boolean deleteSnapshot) { - throw new UnsupportedOperationException("hardLink"); + public String buildSnapshotPath(String snapshotPrefix) { + throw new UnsupportedOperationException("buildSnapshotPath"); + } + + @Override + public String hardLinkSnapshot(String snapshotPath) + throws RocksDBException { + throw new UnsupportedOperationException("hardLinkSnapshot"); } @Override public void reloadRocksDB() throws RocksDBException { - throw new UnsupportedOperationException("reload"); + throw new UnsupportedOperationException("reloadRocksDB"); } @Override