diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java index caca4d76e2..59397ae1e1 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java @@ -19,14 +19,10 @@ package com.baidu.hugegraph.backend.store.rocksdb; -import java.io.IOException; -import java.nio.file.Paths; import java.util.Set; -import org.apache.commons.io.FileUtils; import org.rocksdb.RocksDBException; -import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator; import com.baidu.hugegraph.backend.store.BackendSession; import com.baidu.hugegraph.backend.store.BackendSessionPool; @@ -48,21 +44,6 @@ public RocksDBSessions(String database, String store) { @Override public abstract Session session(); - public String wrapPath(String path) { - return wrapPath(path, this.store); - } - - public static String wrapPath(String path, String store) { - // Ensure the `path` exists - try { - FileUtils.forceMkdir(FileUtils.getFile(path)); - } catch (IOException e) { - throw new BackendException(e.getMessage(), e); - } - // Join with store type - return Paths.get(path, store).toString(); - } - /** * Session for RocksDB */ diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 404ad34273..a6da8133e7 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -62,15 +62,13 @@ public class RocksDBStdSessions extends RocksDBSessions { private final HugeConfig conf; private final RocksDB rocksdb; - public RocksDBStdSessions(HugeConfig config, String database, String store) + public RocksDBStdSessions(HugeConfig config, String dataPath, + String walPath, String database, String store) throws RocksDBException { super(database, store); this.conf = config; - String dataPath = wrapPath(this.conf.get(RocksDBOptions.DATA_PATH)); - String walPath = wrapPath(this.conf.get(RocksDBOptions.WAL_PATH)); - // Init options Options options = new Options(); RocksDBStdSessions.initOptions(this.conf, options, options, options); @@ -83,14 +81,12 @@ public RocksDBStdSessions(HugeConfig config, String database, String store) this.rocksdb = RocksDB.open(options, dataPath); } - public RocksDBStdSessions(HugeConfig config, String database, String store, + public RocksDBStdSessions(HugeConfig config, String dataPath, + String walPath, String database, String store, List cfNames) throws RocksDBException { super(database, store); this.conf = config; - String dataPath = wrapPath(this.conf.get(RocksDBOptions.DATA_PATH)); - String walPath = wrapPath(this.conf.get(RocksDBOptions.WAL_PATH)); - // Old CFs should always be opened List cfs = this.mergeOldCFs(dataPath, cfNames); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index 6056831a94..ed4002a417 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -19,6 +19,8 @@ package com.baidu.hugegraph.backend.store.rocksdb; +import java.io.IOException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -29,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; import org.rocksdb.RocksDBException; import org.slf4j.Logger; @@ -129,9 +132,10 @@ public synchronized void open(HugeConfig config) { } // Open base disk - String dataPath = config.get(RocksDBOptions.DATA_PATH); - dataPath = RocksDBSessions.wrapPath(dataPath, this.store); - this.sessions = this.open(config, dataPath, this.tableNames()); + String dataPath = this.wrapPath(config.get(RocksDBOptions.DATA_PATH)); + String walPath = this.wrapPath(config.get(RocksDBOptions.WAL_PATH)); + + this.sessions = this.open(config, dataPath, walPath, this.tableNames()); // Open tables with optimized disk List disks = config.get(RocksDBOptions.DATA_DISKS); @@ -140,18 +144,19 @@ public synchronized void open(HugeConfig config) { for (Entry e : this.tableDiskMapping.entrySet()) { String table = this.table(e.getKey()).table(); String disk = e.getValue(); - this.open(config, disk, Arrays.asList(table)); + this.open(config, disk, disk, Arrays.asList(table)); } } } protected RocksDBSessions open(HugeConfig config, String dataPath, - List tableNames) { + String walPath, List tableNames) { LOG.info("Opening RocksDB with data path: {}", dataPath); RocksDBSessions sessions = null; try { - sessions = this.openSessionPool(config, tableNames); + sessions = this.openSessionPool(config, dataPath, + walPath, tableNames); } catch (RocksDBException e) { if (dbs.containsKey(dataPath)) { if (e.getMessage().contains("No locks available")) { @@ -162,7 +167,8 @@ protected RocksDBSessions open(HugeConfig config, String dataPath, try { // Will open old CFs(of other keyspace) final List none = ImmutableList.of(); - sessions = this.openSessionPool(config, none); + sessions = this.openSessionPool(config, dataPath, + walPath, none); } catch (RocksDBException e1) { // Let it throw later e = e1; @@ -174,7 +180,8 @@ protected RocksDBSessions open(HugeConfig config, String dataPath, "try to init CF later", dataPath, this.database); try { // Only open default CF, won't open old CFs - sessions = this.openSessionPool(config, null); + sessions = this.openSessionPool(config, dataPath, + walPath, null); } catch (RocksDBException e1) { LOG.error("Failed to open RocksDB with default CF", e1); } @@ -198,12 +205,15 @@ protected RocksDBSessions open(HugeConfig config, String dataPath, } protected RocksDBSessions openSessionPool(HugeConfig config, + String dataPath, String walPath, List tableNames) throws RocksDBException { if (tableNames == null) { - return new RocksDBStdSessions(config, this.database, this.store); + return new RocksDBStdSessions(config, dataPath, walPath, + this.database, this.store); } else { - return new RocksDBStdSessions(config, this.database, this.store, + return new RocksDBStdSessions(config, dataPath, walPath, + this.database, this.store, tableNames); } } @@ -401,7 +411,7 @@ private void parseTableDiskMapping(List disks) { String store = pair[0].trim(); HugeType table = HugeType.valueOf(pair[1].trim().toUpperCase()); if (this.store.equals(store)) { - path = RocksDBSessions.wrapPath(path, this.store); + path = this.wrapPath(path); this.tableDiskMapping.put(table, path); } } @@ -414,6 +424,17 @@ private static RocksDBSessions db(String disk) { return db; } + protected String wrapPath(String path) { + // Ensure the `path` exists + try { + FileUtils.forceMkdir(FileUtils.getFile(path)); + } catch (IOException e) { + throw new BackendException(e.getMessage(), e); + } + // Join with store type + return Paths.get(path, this.store).toString(); + } + /***************************** Store defines *****************************/ public static class RocksDBSchemaStore extends RocksDBStore { diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index f6d3617d91..de0e80cbbe 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -22,12 +22,8 @@ import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.tuple.Pair; @@ -38,7 +34,6 @@ import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator; -import com.baidu.hugegraph.backend.store.rocksdb.RocksDBOptions; import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions; import com.baidu.hugegraph.backend.store.rocksdb.RocksDBStdSessions; import com.baidu.hugegraph.config.HugeConfig; @@ -51,11 +46,12 @@ public class RocksDBSstSessions extends RocksDBSessions { private final String dataPath; private final Map tables; - public RocksDBSstSessions(HugeConfig conf, String database, String store) { + public RocksDBSstSessions(HugeConfig conf, String dataPath, + String database, String store) { super(database, store); this.conf = conf; - this.dataPath = this.wrapPath(this.conf.get(RocksDBOptions.DATA_PATH)); + this.dataPath = dataPath; this.tables = new ConcurrentHashMap<>(); File path = new File(this.dataPath); @@ -64,9 +60,10 @@ public RocksDBSstSessions(HugeConfig conf, String database, String store) { } } - public RocksDBSstSessions(HugeConfig config, String database, String store, + public RocksDBSstSessions(HugeConfig config, String dataPath, + String database, String store, List tableNames) throws RocksDBException { - this(config, database, store); + this(config, dataPath, database, store); for (String table : tableNames) { this.createTable(table); } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java index 6ce77f040a..8709a2aad8 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java @@ -40,13 +40,14 @@ public RocksDBSstStore(final BackendStoreProvider provider, @Override protected RocksDBSessions openSessionPool(HugeConfig config, + String dataPath, String walPath, List tableNames) throws RocksDBException { if (tableNames == null) { - return new RocksDBSstSessions(config, this.database(), + return new RocksDBSstSessions(config, dataPath, this.database(), this.store()); } else { - return new RocksDBSstSessions(config, this.database(), + return new RocksDBSstSessions(config, dataPath, this.database(), this.store(), tableNames); } } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java index ba6ddbae3c..ec600af08a 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java @@ -34,7 +34,6 @@ import org.mockito.Mockito; import org.rocksdb.RocksDBException; -import com.baidu.hugegraph.backend.store.rocksdb.RocksDBOptions; import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions; import com.baidu.hugegraph.backend.store.rocksdb.RocksDBStdSessions; import com.baidu.hugegraph.config.HugeConfig; @@ -117,9 +116,8 @@ private static RocksDBSessions open(String table) throws RocksDBException { Configuration conf = Mockito.mock(PropertiesConfiguration.class); Mockito.when(conf.getKeys()).thenReturn(Collections.emptyIterator()); HugeConfig config = new HugeConfig(conf); - config.setProperty(RocksDBOptions.DATA_PATH.name(), DB_PATH); - config.setProperty(RocksDBOptions.WAL_PATH.name(), DB_PATH); - RocksDBSessions rocks = new RocksDBStdSessions(config, "db", "store"); + RocksDBSessions rocks = new RocksDBStdSessions(config, DB_PATH, DB_PATH, + "db", "store"); rocks.createTable(table); return rocks; }