Skip to content

Commit

Permalink
Merge pull request #7496 from petermattis/pmattis/shared-rocksdb-cache
Browse files Browse the repository at this point in the history
storage/engine: share the block cache instead of partitioning it
  • Loading branch information
petermattis authored Jun 28, 2016
2 parents f14578f + ab4721c commit 08d0ffa
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 35 deletions.
4 changes: 3 additions & 1 deletion cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func parseRangeID(arg string) (roachpb.RangeID, error) {
}

func openStore(cmd *cobra.Command, dir string, stopper *stop.Stopper) (engine.Engine, error) {
db := engine.NewRocksDB(roachpb.Attributes{}, dir, 512<<20, 10<<20, 0, stopper)
cache := engine.NewRocksDBCache(512 << 20)
defer cache.Release()
db := engine.NewRocksDB(roachpb.Attributes{}, dir, cache, 10<<20, 0, stopper)
if err := db.Open(); err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ func MakeContext() Context {

// InitStores initializes ctx.Engines based on ctx.Stores.
func (ctx *Context) InitStores(stopper *stop.Stopper) error {
// TODO(peter): The comments and docs say that CacheSize and MemtableBudget
// are split evenly if there are multiple stores, but we aren't doing that
// currently. See #4979 and #4980.
cache := engine.NewRocksDBCache(ctx.CacheSize)
defer cache.Release()

for _, spec := range ctx.Stores.Specs {
var sizeInBytes = spec.SizeInBytes
if spec.InMemory {
Expand Down Expand Up @@ -258,7 +258,7 @@ func (ctx *Context) InitStores(stopper *stop.Stopper) error {
spec.SizePercent, spec.Path, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(minimumStoreSize))
}
ctx.Engines = append(ctx.Engines, engine.NewRocksDB(spec.Attributes, spec.Path,
ctx.CacheSize/int64(len(ctx.Stores.Specs)), ctx.MemtableBudget, sizeInBytes, stopper))
cache, ctx.MemtableBudget, sizeInBytes, stopper))
}
}
if len(ctx.Engines) == 1 {
Expand Down
3 changes: 1 addition & 2 deletions storage/engine/bench_rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ import (
)

func setupMVCCRocksDB(b testing.TB, loc string) (Engine, *stop.Stopper) {
const cacheSize = 0
const memtableBudget = 512 << 20 // 512 MB
stopper := stop.NewStopper()
rocksdb := NewRocksDB(roachpb.Attributes{}, loc, cacheSize, memtableBudget, 0, stopper)
rocksdb := NewRocksDB(roachpb.Attributes{}, loc, RocksDBCache{}, memtableBudget, 0, stopper)
if err := rocksdb.Open(); err != nil {
b.Fatalf("could not create new rocksdb db instance at %s: %v", loc, err)
}
Expand Down
4 changes: 3 additions & 1 deletion storage/engine/in_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type InMem struct {

// NewInMem allocates and returns a new, opened InMem engine.
func NewInMem(attrs roachpb.Attributes, cacheSize int64, stopper *stop.Stopper) InMem {
cache := NewRocksDBCache(cacheSize)
defer cache.Release()
db := InMem{
RocksDB: newMemRocksDB(attrs, cacheSize, 512<<20 /* 512 MB */, stopper),
RocksDB: newMemRocksDB(attrs, cache, 512<<20 /* 512 MB */, stopper),
}
if err := db.Open(); err != nil {
panic(err)
Expand Down
50 changes: 43 additions & 7 deletions storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,37 @@ func init() {
rocksdb.Logger = log.Infof
}

// RocksDBCache is a wrapper around C.DBCache
type RocksDBCache struct {
cache *C.DBCache
}

// NewRocksDBCache creates a new cache of the specified size.
func NewRocksDBCache(cacheSize int64) RocksDBCache {
return RocksDBCache{cache: C.DBNewCache(C.uint64_t(cacheSize))}
}

func (c RocksDBCache) ref() RocksDBCache {
if c.cache != nil {
c.cache = C.DBRefCache(c.cache)
}
return c
}

// Release releases the cache. Note that the cache will continue to be used
// until all of the RocksDB engines it was attached to have been closed.
func (c RocksDBCache) Release() {
if c.cache != nil {
C.DBReleaseCache(c.cache)
}
}

// RocksDB is a wrapper around a RocksDB database instance.
type RocksDB struct {
rdb *C.DBEngine
attrs roachpb.Attributes // Attributes for this engine
dir string // The data directory
cacheSize int64 // Memory to use to cache values.
cache RocksDBCache // Shared cache.
memtableBudget int64 // Memory to use for the memory table.
maxSize int64 // Used for calculating rebalancing and free space.
stopper *stop.Stopper
Expand All @@ -74,27 +99,37 @@ type RocksDB struct {
var _ Engine = &RocksDB{}

// NewRocksDB allocates and returns a new RocksDB object.
func NewRocksDB(attrs roachpb.Attributes, dir string, cacheSize, memtableBudget, maxSize int64,
stopper *stop.Stopper) *RocksDB {
func NewRocksDB(
attrs roachpb.Attributes,
dir string,
cache RocksDBCache,
memtableBudget, maxSize int64,
stopper *stop.Stopper,
) *RocksDB {
if dir == "" {
panic("dir must be non-empty")
}
return &RocksDB{
attrs: attrs,
dir: dir,
cacheSize: cacheSize,
cache: cache.ref(),
memtableBudget: memtableBudget,
maxSize: maxSize,
stopper: stopper,
deallocated: make(chan struct{}),
}
}

func newMemRocksDB(attrs roachpb.Attributes, cacheSize, memtableBudget int64, stopper *stop.Stopper) *RocksDB {
func newMemRocksDB(
attrs roachpb.Attributes,
cache RocksDBCache,
memtableBudget int64,
stopper *stop.Stopper,
) *RocksDB {
return &RocksDB{
attrs: attrs,
// dir: empty dir == "mem" RocksDB instance.
cacheSize: cacheSize,
cache: cache.ref(),
memtableBudget: memtableBudget,
stopper: stopper,
deallocated: make(chan struct{}),
Expand Down Expand Up @@ -147,7 +182,7 @@ func (r *RocksDB) Open() error {

status := C.DBOpen(&r.rdb, goToCSlice([]byte(r.dir)),
C.DBOptions{
cache_size: C.uint64_t(r.cacheSize),
cache: r.cache.cache,
memtable_budget: C.uint64_t(r.memtableBudget),
block_size: C.uint64_t(envutil.EnvOrDefaultBytes("rocksdb_block_size", defaultBlockSize)),
wal_ttl_seconds: C.uint64_t(envutil.EnvOrDefaultDuration("rocksdb_wal_ttl", 0).Seconds()),
Expand Down Expand Up @@ -191,6 +226,7 @@ func (r *RocksDB) Close() {
C.DBClose(r.rdb)
r.rdb = nil
}
r.cache.Release()
close(r.deallocated)
}

Expand Down
38 changes: 23 additions & 15 deletions storage/engine/rocksdb/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
extern "C" {
#include "_cgo_export.h"

struct DBCache {
std::shared_ptr<rocksdb::Cache> rep;
};

struct DBEngine {
rocksdb::DB* const rep;

Expand Down Expand Up @@ -1314,18 +1318,27 @@ DBBatch::DBBatch(DBEngine* db)
updates(0) {
}

DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions db_opts) {
// Divide the cache space into two levels: the fast row cache
// and the slower but more space-efficient block cache.
// TODO(bdarnell): do we need both? how much of each?
// TODO(peter): disabled for now until benchmarks show improvement.
const auto row_cache_size = 0 * db_opts.cache_size;
const auto block_cache_size = db_opts.cache_size - row_cache_size;
DBCache* DBNewCache(uint64_t size) {
const int num_cache_shard_bits = 4;
DBCache *cache = new DBCache;
cache->rep = rocksdb::NewLRUCache(size, num_cache_shard_bits);
return cache;
}

DBCache* DBRefCache(DBCache *cache) {
DBCache *res = new DBCache;
res->rep = cache->rep;
return res;
}

void DBReleaseCache(DBCache *cache) {
delete cache;
}

DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions db_opts) {
rocksdb::BlockBasedTableOptions table_options;
if (block_cache_size > 0) {
table_options.block_cache = rocksdb::NewLRUCache(
block_cache_size, num_cache_shard_bits);
if (db_opts.cache != nullptr) {
table_options.block_cache = db_opts.cache->rep;
}
// Pass false for use_blocked_base_builder creates a per file
// (sstable) filter instead of a per-block filter. The per file
Expand Down Expand Up @@ -1366,11 +1379,6 @@ DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions db_opts) {
options.max_bytes_for_level_base = 512 << 20;
options.max_bytes_for_level_multiplier = 8;

if (row_cache_size > 0) {
options.row_cache = rocksdb::NewLRUCache(
row_cache_size, num_cache_shard_bits);
}

// Register listener for tracking RocksDB stats.
std::shared_ptr<DBEventListener> event_listener(new DBEventListener);
options.listeners.emplace_back(event_listener);
Expand Down
15 changes: 14 additions & 1 deletion storage/engine/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,32 @@ typedef struct {
// operation. If DBStatus.data == NULL the operation succeeded.
typedef DBString DBStatus;

typedef struct DBCache DBCache;
typedef struct DBEngine DBEngine;
typedef struct DBIterator DBIterator;

// DBOptions contains local database options.
typedef struct {
uint64_t cache_size;
DBCache *cache;
uint64_t memtable_budget;
uint64_t block_size;
uint64_t wal_ttl_seconds;
bool allow_os_buffer;
bool logging_enabled;
} DBOptions;

// Create a new cache with the specified size.
DBCache* DBNewCache(uint64_t size);

// Add a reference to an existing cache. Note that the underlying
// RocksDB cache is shared between the original and new reference.
DBCache* DBRefCache(DBCache *cache);

// Release a cache, decrementing the reference count on the underlying
// RocksDB cache. Note that the RocksDB cache will not be freed until
// all of the references have been released.
void DBReleaseCache(DBCache *cache);

// Opens the database located in "dir", creating it if it doesn't
// exist.
DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions options);
Expand Down
8 changes: 4 additions & 4 deletions storage/engine/rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const testCacheSize = 1 << 30 // 1 GB
func TestMinMemtableBudget(t *testing.T) {
defer leaktest.AfterTest(t)()

rocksdb := NewRocksDB(roachpb.Attributes{}, ".", 0, 0, 0, stop.NewStopper())
rocksdb := NewRocksDB(roachpb.Attributes{}, ".", RocksDBCache{}, 0, 0, stop.NewStopper())
const expected = "memtable budget must be at least"
if err := rocksdb.Open(); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but got %v", expected, err)
Expand Down Expand Up @@ -231,7 +231,7 @@ func openRocksDBWithVersion(t *testing.T, hasVersionFile bool, ver Version) erro
}
}

rocksdb := NewRocksDB(roachpb.Attributes{}, dir, 0, minMemtableBudget, 0, stopper)
rocksdb := NewRocksDB(roachpb.Attributes{}, dir, RocksDBCache{}, minMemtableBudget, 0, stopper)
return rocksdb.Open()
}

Expand All @@ -253,7 +253,7 @@ func TestCheckpoint(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop()

db := NewRocksDB(roachpb.Attributes{}, dir, 0, minMemtableBudget, 0, stopper)
db := NewRocksDB(roachpb.Attributes{}, dir, RocksDBCache{}, minMemtableBudget, 0, stopper)
if err := db.Open(); err != nil {
t.Fatal(err)
}
Expand All @@ -280,7 +280,7 @@ func TestCheckpoint(t *testing.T) {
defer stopper.Stop()

dir = filepath.Join(dir, "checkpoint")
db := NewRocksDB(roachpb.Attributes{}, dir, 0, minMemtableBudget, 0, stopper)
db := NewRocksDB(roachpb.Attributes{}, dir, RocksDBCache{}, minMemtableBudget, 0, stopper)
if err := db.Open(); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 08d0ffa

Please sign in to comment.