Skip to content

Commit

Permalink
fix bug init-store failed due to datapath and walpath error for rocks…
Browse files Browse the repository at this point in the history
…db backend

fix #22

Change-Id: Id86628f0073301a969d054d4e52b115ac9096c14
  • Loading branch information
zhoney committed Aug 22, 2018
1 parent 16fd728 commit 388847d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@

package com.baidu.hugegraph.backend.store.rocksdb;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Set;

import org.apache.commons.io.FileUtils;
import org.rocksdb.RocksDBException;

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import com.baidu.hugegraph.backend.store.BackendSession;
import com.baidu.hugegraph.backend.store.BackendSessionPool;
Expand All @@ -48,21 +44,6 @@ public RocksDBSessions(String database, String store) {
@Override
public abstract Session session();

public String wrapPath(String path) {
return wrapPath(path, this.store);
}

public static String wrapPath(String path, String store) {
// Ensure the `path` exists
try {
FileUtils.forceMkdir(FileUtils.getFile(path));
} catch (IOException e) {
throw new BackendException(e.getMessage(), e);
}
// Join with store type
return Paths.get(path, store).toString();
}

/**
* Session for RocksDB
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ public class RocksDBStdSessions extends RocksDBSessions {
private final HugeConfig conf;
private final RocksDB rocksdb;

public RocksDBStdSessions(HugeConfig config, String database, String store)
public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
throws RocksDBException {
super(database, store);

this.conf = config;

String dataPath = wrapPath(this.conf.get(RocksDBOptions.DATA_PATH));
String walPath = wrapPath(this.conf.get(RocksDBOptions.WAL_PATH));

// Init options
Options options = new Options();
RocksDBStdSessions.initOptions(this.conf, options, options, options);
Expand All @@ -83,14 +81,12 @@ public RocksDBStdSessions(HugeConfig config, String database, String store)
this.rocksdb = RocksDB.open(options, dataPath);
}

public RocksDBStdSessions(HugeConfig config, String database, String store,
public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store,
List<String> cfNames) throws RocksDBException {
super(database, store);
this.conf = config;

String dataPath = wrapPath(this.conf.get(RocksDBOptions.DATA_PATH));
String walPath = wrapPath(this.conf.get(RocksDBOptions.WAL_PATH));

// Old CFs should always be opened
List<String> cfs = this.mergeOldCFs(dataPath, cfNames);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package com.baidu.hugegraph.backend.store.rocksdb;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -29,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.io.FileUtils;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;

Expand Down Expand Up @@ -129,9 +132,10 @@ public synchronized void open(HugeConfig config) {
}

// Open base disk
String dataPath = config.get(RocksDBOptions.DATA_PATH);
dataPath = RocksDBSessions.wrapPath(dataPath, this.store);
this.sessions = this.open(config, dataPath, this.tableNames());
String dataPath = this.wrapPath(config.get(RocksDBOptions.DATA_PATH));
String walPath = this.wrapPath(config.get(RocksDBOptions.WAL_PATH));

this.sessions = this.open(config, dataPath, walPath, this.tableNames());

// Open tables with optimized disk
List<String> disks = config.get(RocksDBOptions.DATA_DISKS);
Expand All @@ -140,18 +144,19 @@ public synchronized void open(HugeConfig config) {
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
String disk = e.getValue();
this.open(config, disk, Arrays.asList(table));
this.open(config, disk, disk, Arrays.asList(table));
}
}
}

protected RocksDBSessions open(HugeConfig config, String dataPath,
List<String> tableNames) {
String walPath, List<String> tableNames) {
LOG.info("Opening RocksDB with data path: {}", dataPath);

RocksDBSessions sessions = null;
try {
sessions = this.openSessionPool(config, tableNames);
sessions = this.openSessionPool(config, dataPath,
walPath, tableNames);
} catch (RocksDBException e) {
if (dbs.containsKey(dataPath)) {
if (e.getMessage().contains("No locks available")) {
Expand All @@ -162,7 +167,8 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
try {
// Will open old CFs(of other keyspace)
final List<String> none = ImmutableList.of();
sessions = this.openSessionPool(config, none);
sessions = this.openSessionPool(config, dataPath,
walPath, none);
} catch (RocksDBException e1) {
// Let it throw later
e = e1;
Expand All @@ -174,7 +180,8 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
"try to init CF later", dataPath, this.database);
try {
// Only open default CF, won't open old CFs
sessions = this.openSessionPool(config, null);
sessions = this.openSessionPool(config, dataPath,
walPath, null);
} catch (RocksDBException e1) {
LOG.error("Failed to open RocksDB with default CF", e1);
}
Expand All @@ -198,12 +205,15 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
}

protected RocksDBSessions openSessionPool(HugeConfig config,
String dataPath, String walPath,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBStdSessions(config, this.database, this.store);
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store);
} else {
return new RocksDBStdSessions(config, this.database, this.store,
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store,
tableNames);
}
}
Expand Down Expand Up @@ -401,7 +411,7 @@ private void parseTableDiskMapping(List<String> disks) {
String store = pair[0].trim();
HugeType table = HugeType.valueOf(pair[1].trim().toUpperCase());
if (this.store.equals(store)) {
path = RocksDBSessions.wrapPath(path, this.store);
path = this.wrapPath(path);
this.tableDiskMapping.put(table, path);
}
}
Expand All @@ -414,6 +424,17 @@ private static RocksDBSessions db(String disk) {
return db;
}

protected String wrapPath(String path) {
// Ensure the `path` exists
try {
FileUtils.forceMkdir(FileUtils.getFile(path));
} catch (IOException e) {
throw new BackendException(e.getMessage(), e);
}
// Join with store type
return Paths.get(path, this.store).toString();
}

/***************************** Store defines *****************************/

public static class RocksDBSchemaStore extends RocksDBStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -38,7 +34,6 @@

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBOptions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBStdSessions;
import com.baidu.hugegraph.config.HugeConfig;
Expand All @@ -51,11 +46,12 @@ public class RocksDBSstSessions extends RocksDBSessions {
private final String dataPath;
private final Map<String, SstFileWriter> tables;

public RocksDBSstSessions(HugeConfig conf, String database, String store) {
public RocksDBSstSessions(HugeConfig conf, String dataPath,
String database, String store) {
super(database, store);

this.conf = conf;
this.dataPath = this.wrapPath(this.conf.get(RocksDBOptions.DATA_PATH));
this.dataPath = dataPath;
this.tables = new ConcurrentHashMap<>();

File path = new File(this.dataPath);
Expand All @@ -64,9 +60,10 @@ public RocksDBSstSessions(HugeConfig conf, String database, String store) {
}
}

public RocksDBSstSessions(HugeConfig config, String database, String store,
public RocksDBSstSessions(HugeConfig config, String dataPath,
String database, String store,
List<String> tableNames) throws RocksDBException {
this(config, database, store);
this(config, dataPath, database, store);
for (String table : tableNames) {
this.createTable(table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public RocksDBSstStore(final BackendStoreProvider provider,

@Override
protected RocksDBSessions openSessionPool(HugeConfig config,
String dataPath, String walPath,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBSstSessions(config, this.database(),
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store());
} else {
return new RocksDBSstSessions(config, this.database(),
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store(), tableNames);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.mockito.Mockito;
import org.rocksdb.RocksDBException;

import com.baidu.hugegraph.backend.store.rocksdb.RocksDBOptions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBStdSessions;
import com.baidu.hugegraph.config.HugeConfig;
Expand Down Expand Up @@ -117,9 +116,8 @@ private static RocksDBSessions open(String table) throws RocksDBException {
Configuration conf = Mockito.mock(PropertiesConfiguration.class);
Mockito.when(conf.getKeys()).thenReturn(Collections.emptyIterator());
HugeConfig config = new HugeConfig(conf);
config.setProperty(RocksDBOptions.DATA_PATH.name(), DB_PATH);
config.setProperty(RocksDBOptions.WAL_PATH.name(), DB_PATH);
RocksDBSessions rocks = new RocksDBStdSessions(config, "db", "store");
RocksDBSessions rocks = new RocksDBStdSessions(config, DB_PATH, DB_PATH,
"db", "store");
rocks.createTable(table);
return rocks;
}
Expand Down

0 comments on commit 388847d

Please sign in to comment.