Skip to content

Commit

Permalink
Fix bug: Backend metrics of rocksdb disk usage can't work
Browse files Browse the repository at this point in the history
Fix #323

Change-Id: I11e57d3f7247b77853675f04173632021113cb5f
  • Loading branch information
Linary authored and zhoney committed Feb 12, 2019
1 parent 05669ed commit 0239d49
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,71 @@

package com.baidu.hugegraph.backend.store.rocksdb;

import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;

import com.baidu.hugegraph.backend.store.BackendMetrics;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions.Session;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.InsertionOrderUtil;

public class RocksDBMetrics implements BackendMetrics {

private static final String INDEX_FILTER =
"rocksdb.estimate-table-readers-mem";
private static final String MEM_TABLE = "rocksdb.cur-size-all-mem-tables";
private static final String R_DATA_SIZE = "rocksdb.estimate-live-data-size";
public static final String BLOCK_CACHE = "rocksdb.block-cache-usage";
public static final String INDEX_FILTER =
"rocksdb.estimate-table-readers-mem";
public static final String MEM_TABLE = "rocksdb.cur-size-all-mem-tables";

private final Session session;
public static final String DISK_USAGE = "rocksdb.disk-usage";

public RocksDBMetrics(Session session) {
private final List<RocksDBSessions> dbs;
private final RocksDBSessions.Session session;

public RocksDBMetrics(List<RocksDBSessions> dbs,
RocksDBSessions.Session session) {
this.dbs = dbs;
this.session = session;
}

@Override
public Map<String, Object> getMetrics() {
Map<String, Object> metrics = InsertionOrderUtil.newMap();
// NOTE: the unit of rocksdb mem property is kb
metrics.put(MEM_USED, this.getMemUsed() / Bytes.BASE);
// NOTE: the unit of rocksdb mem property is bytes
metrics.put(MEM_USED, this.getMemUsed() / Bytes.MB);
metrics.put(MEM_UNIT, "MB");
String size = FileUtils.byteCountToDisplaySize(this.getDataSize());
metrics.put(DATA_SIZE, size);
return metrics;
}

private long getMemUsed() {
long indexFilter = Long.parseLong(this.session.property(INDEX_FILTER));
long memtable = Long.parseLong(this.session.property(MEM_TABLE));
return indexFilter + memtable;
private double getMemUsed() {
double blockCache = this.sum(this.session, BLOCK_CACHE);
double indexFilter = this.sum(this.session, INDEX_FILTER);
double memtable = this.sum(this.session, MEM_TABLE);
return blockCache + indexFilter + memtable;
}

private long getDataSize() {
return Long.parseLong(this.session.property(R_DATA_SIZE));
return (long) this.sum(DISK_USAGE);
}

private double sum(RocksDBSessions.Session session, String property) {
double total = 0;
for (RocksDBSessions db : this.dbs) {
total += Double.parseDouble(db.property(property));
for (String table : db.openedTables()) {
total += Double.parseDouble(session.property(table, property));
}
}
return total;
}

private double sum(String property) {
double total = 0;
for (RocksDBSessions db : this.dbs) {
total += Double.parseDouble(db.property(property));
}
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public RocksDBSessions(String database, String store) {
public abstract void createTable(String table) throws RocksDBException;
public abstract void dropTable(String table) throws RocksDBException;

public abstract String property(String property);

@Override
public abstract Session session();

Expand All @@ -54,7 +56,6 @@ public static abstract class Session extends BackendSession {
public static final int SCAN_LT_END = 0x10;
public static final int SCAN_LTE_END = 0x30;

public abstract String property(String property);
public abstract String property(String table, String property);

public abstract void put(String table, byte[] key, byte[] value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.DBOptionsInterface;
import org.rocksdb.Env;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.MutableColumnFamilyOptionsInterface;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.SstFileManager;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

Expand All @@ -64,6 +66,7 @@ public class RocksDBStdSessions extends RocksDBSessions {

private final HugeConfig conf;
private final RocksDB rocksdb;
private final SstFileManager sstFileManager;

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
Expand All @@ -77,6 +80,9 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
RocksDBStdSessions.initOptions(this.conf, options, options, options);
options.setWalDir(walPath);

this.sstFileManager = new SstFileManager(Env.getDefault());
options.setSstFileManager(this.sstFileManager);

/*
* Open RocksDB at the first time
* Don't merge old CFs, we expect a clear DB when using this one
Expand Down Expand Up @@ -108,6 +114,9 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
RocksDBStdSessions.initOptions(this.conf, options, null, null);
options.setWalDir(walPath);

this.sstFileManager = new SstFileManager(Env.getDefault());
options.setSstFileManager(this.sstFileManager);

// Open RocksDB with CFs
List<ColumnFamilyHandle> cfhs = new ArrayList<>();
this.rocksdb = RocksDB.open(options, dataPath, cfds, cfhs);
Expand Down Expand Up @@ -164,6 +173,18 @@ public void dropTable(String table) throws RocksDBException {
this.cfs.remove(table);
}

@Override
public String property(String property) {
try {
if (property.equals(RocksDBMetrics.DISK_USAGE)) {
return String.valueOf(this.sstFileManager.getTotalSize());
}
return rocksdb().getProperty(property);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

@Override
public final synchronized Session session() {
return (Session) super.getOrNewSession();
Expand Down Expand Up @@ -398,18 +419,6 @@ public boolean hasChanges() {
return this.batch.count() > 0;
}

/**
* Get property value
*/
@Override
public String property(String property) {
try {
return rocksdb().getProperty(property);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
* Get property value by name from specified table
*/
Expand Down Expand Up @@ -450,7 +459,11 @@ public Integer commit() {
*/
@Override
public void put(String table, byte[] key, byte[] value) {
this.batch.put(cf(table), key, value);
try {
this.batch.put(cf(table), key, value);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand All @@ -460,7 +473,11 @@ public void put(String table, byte[] key, byte[] value) {
*/
@Override
public void merge(String table, byte[] key, byte[] value) {
this.batch.merge(cf(table), key, value);
try {
this.batch.merge(cf(table), key, value);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand All @@ -480,7 +497,11 @@ public void increase(String table, byte[] key, byte[] value) {
*/
@Override
public void remove(String table, byte[] key) {
this.batch.remove(cf(table), key);
try {
this.batch.singleDelete(cf(table), key);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand All @@ -491,15 +512,23 @@ public void delete(String table, byte[] key) {
byte[] keyFrom = key;
byte[] keyTo = Arrays.copyOf(key, key.length);
keyTo = BinarySerializer.increaseOne(keyTo);
this.batch.deleteRange(cf(table), keyFrom, keyTo);
try {
this.batch.deleteRange(cf(table), keyFrom, keyTo);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
* Delete a range of keys from a table
*/
@Override
public void delete(String table, byte[] keyFrom, byte[] keyTo) {
this.batch.deleteRange(cf(table), keyFrom, keyTo);
try {
this.batch.deleteRange(cf(table), keyFrom, keyTo);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -86,7 +87,11 @@ public RocksDBStore(final BackendStoreProvider provider,

private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
RocksDBMetrics metrics = new RocksDBMetrics(session);
List<RocksDBSessions> dbs = new ArrayList<>();
dbs.add(sessions);
dbs.addAll(tableDBMapping().values());

RocksDBMetrics metrics = new RocksDBMetrics(dbs, session);
return metrics.getMetrics();
});
}
Expand Down Expand Up @@ -243,6 +248,16 @@ protected String wrapPath(String path) {
return Paths.get(path, this.store).toString();
}

protected Map<String, RocksDBSessions> tableDBMapping() {
Map<String, RocksDBSessions> tableDBMap = InsertionOrderUtil.newMap();
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
RocksDBSessions db = db(e.getValue());
tableDBMap.put(table, db);
}
return tableDBMap;
}

@Override
public void close() {
LOG.debug("Store close: {}", this.store);
Expand Down Expand Up @@ -305,10 +320,9 @@ public void init() {
}

// Create table with optimized disk
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
RocksDBSessions db = db(e.getValue());
this.createTable(db, table);
Map<String, RocksDBSessions> tableDBMap = this.tableDBMapping();
for (Map.Entry<String, RocksDBSessions> e : tableDBMap.entrySet()) {
this.createTable(e.getValue(), e.getKey());
}

LOG.debug("Store initialized: {}", this.store);
Expand All @@ -332,10 +346,9 @@ public void clear() {
}

// Drop table with optimized disk
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
RocksDBSessions db = db(e.getValue());
this.dropTable(db, table);
Map<String, RocksDBSessions> tableDBMap = this.tableDBMapping();
for (Map.Entry<String, RocksDBSessions> e : tableDBMap.entrySet()) {
this.dropTable(e.getValue(), e.getKey());
}

LOG.debug("Store cleared: {}", this.store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public void dropTable(String table) throws RocksDBException {
this.tables.remove(table);
}

@Override
public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

private SstFileWriter table(String table) {
SstFileWriter sst = this.tables.get(table);
if (sst == null) {
Expand Down Expand Up @@ -217,14 +222,6 @@ public Integer commit() {
return count;
}

/**
* Get property value
*/
@Override
public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

/**
* Get property value by name from specified table
*/
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
<httpclient.version>4.5.2</httpclient.version>
<datastax.cassandra.version>3.2.0</datastax.cassandra.version>
<apache.cassandra.version>3.10</apache.cassandra.version>
<rocksdb.version>5.8.6</rocksdb.version>
<rocksdb.version>5.17.2</rocksdb.version>
<hbase.client.version>2.0.0</hbase.client.version>
<mysql.driver.version>5.1.45</mysql.driver.version>
<jersey.version>2.25.1</jersey.version>
Expand Down

0 comments on commit 0239d49

Please sign in to comment.