Skip to content

Commit

Permalink
[WIP] engine: Implement Engine, Iterator interfaces for Pebble
Browse files Browse the repository at this point in the history
This change implements the interfaces defined in
`pkg/storage/engine/engine.go` for Pebble, in particular `Engine`,
`Iterator`, and related ones. Also ports the MVCC Scanner to Go
for use in `iter.MVCC{Get,Scan}`.

Other related changes elsewhere in the codebase to let an incomplete
Engine implementer (i.e. without WithSSTables, and without a compactor)
run otherwise.

Also adds a parameter in the `--store` command line flag to specify
what engine to use. `--store=engine=pebble,path=node1` starts a Pebble
based store.

Fixes cockroachdb#39674.

TODO:
- [ ] Refactor tests to run for both RocksDB and Pebble Engines, instead
of one-or-the-other
- [ ] Add more test coverage, especially around batches
- [ ] Function signature comments
- [ ] Comments comments comments

Release note: None
  • Loading branch information
itsbilal committed Sep 26, 2019
1 parent 12c640b commit 5fd1f8d
Show file tree
Hide file tree
Showing 8 changed files with 2,106 additions and 87 deletions.
7 changes: 7 additions & 0 deletions pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type StoreSpec struct {
Path string
Size SizeSpec
InMemory bool
Engine EngineType
Attributes roachpb.Attributes
// UseFileRegistry is true if the "file registry" store version is desired.
// This is set by CCL code when encryption-at-rest is in use.
Expand Down Expand Up @@ -308,6 +309,12 @@ func NewStoreSpec(value string) (StoreSpec, error) {
} else {
return StoreSpec{}, fmt.Errorf("%s is not a valid store type", value)
}
case "engine":
if value == "pebble" {
ss.Engine = EngineTypePebble
} else {
ss.Engine = EngineTypeRocksDB
}
case "rocksdb":
ss.RocksDBOptions = value
default:
Expand Down
46 changes: 34 additions & 12 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/pebble"
pebbleCache "github.com/cockroachdb/pebble/cache"
"github.com/elastic/gosigar"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -479,19 +481,39 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {

details = append(details, fmt.Sprintf("store %d: RocksDB, max size %s, max open file limit %d",
i, humanizeutil.IBytes(sizeInBytes), openFileLimitPerStore))
rocksDBConfig := engine.RocksDBConfig{
Attrs: spec.Attributes,
Dir: spec.Path,
MaxSizeBytes: sizeInBytes,
MaxOpenFiles: openFileLimitPerStore,
WarnLargeBatchThreshold: 500 * time.Millisecond,
Settings: cfg.Settings,
UseFileRegistry: spec.UseFileRegistry,
RocksDBOptions: spec.RocksDBOptions,
ExtraOptions: spec.ExtraOptions,
}

eng, err := engine.NewRocksDB(rocksDBConfig, cache)
var eng engine.Engine
var err error
if spec.Engine == base.EngineTypePebble {
pebbleOpts := &pebble.Options{
Cache: pebbleCache.New(128 << 20),
MaxOpenFiles: int(openFileLimitPerStore),
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinFlushRate: 4 << 20,
L0CompactionThreshold: 2,
L0StopWritesThreshold: 400,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: []pebble.LevelOptions{{
BlockSize: 32 << 10,
}},
}
eng, err = engine.NewPebble(spec.Path, pebbleOpts)
} else {
rocksDBConfig := engine.RocksDBConfig{
Attrs: spec.Attributes,
Dir: spec.Path,
MaxSizeBytes: sizeInBytes,
MaxOpenFiles: openFileLimitPerStore,
WarnLargeBatchThreshold: 500 * time.Millisecond,
Settings: cfg.Settings,
UseFileRegistry: spec.UseFileRegistry,
RocksDBOptions: spec.RocksDBOptions,
ExtraOptions: spec.ExtraOptions,
}

eng, err = engine.NewRocksDB(rocksDBConfig, cache)
}
if err != nil {
return Engines{}, err
}
Expand Down
46 changes: 0 additions & 46 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)

const (
Expand Down Expand Up @@ -1884,51 +1883,6 @@ func (m mvccKeyFormatter) Format(f fmt.State, c rune) {
m.key.Format(f, c)
}

// MVCCComparer is a pebble.Comparer object that implements MVCC-specific
// comparator settings for use with Pebble.
//
// TODO(itsbilal): Move this to a new file pebble.go.
var MVCCComparer = &pebble.Comparer{
Compare: MVCCKeyCompare,
AbbreviatedKey: func(k []byte) uint64 {
key, _, ok := enginepb.SplitMVCCKey(k)
if !ok {
return 0
}
return pebble.DefaultComparer.AbbreviatedKey(key)
},

Format: func(k []byte) fmt.Formatter {
decoded, err := DecodeMVCCKey(k)
if err != nil {
return mvccKeyFormatter{err: err}
}
return mvccKeyFormatter{key: decoded}
},

Separator: func(dst, a, b []byte) []byte {
return append(dst, a...)
},

Successor: func(dst, a []byte) []byte {
return append(dst, a...)
},
Split: func(k []byte) int {
if len(k) == 0 {
return len(k)
}
// This is similar to what enginepb.SplitMVCCKey does.
tsLen := int(k[len(k)-1])
keyPartEnd := len(k) - 1 - tsLen
if keyPartEnd < 0 {
return len(k)
}
return keyPartEnd
},

Name: "cockroach_comparator",
}

// MVCCMerge implements a merge operation. Merge adds integer values,
// concatenates undifferentiated byte slice values, and efficiently
// combines time series observations if the roachpb.Value tag value
Expand Down
65 changes: 49 additions & 16 deletions pkg/storage/engine/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/gogo/protobuf/proto"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -81,7 +83,19 @@ var (
// createTestEngine returns a new in-memory engine with 1MB of storage
// capacity.
func createTestEngine() Engine {
return NewInMem(roachpb.Attributes{}, 1<<20)
//return NewInMem(roachpb.Attributes{}, 1<<20)
return createTestPebbleEngine()
}

// createTestPebbleEngine returns a new in-memory Pebble storage engine.
func createTestPebbleEngine() Engine {
peb, err := NewPebble("", &pebble.Options{
FS: vfs.NewMem(),
})
if err != nil {
return nil
}
return peb
}

// makeTxn creates a new transaction using the specified base
Expand Down Expand Up @@ -389,7 +403,7 @@ func TestMVCCPutWithTxn(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
engine := createTestEngine()
engine := createTestPebbleEngine()
defer engine.Close()

if err := MVCCPut(ctx, engine, nil, testKey1, txn1.OrigTimestamp, value1, txn1); err != nil {
Expand All @@ -398,11 +412,12 @@ func TestMVCCPutWithTxn(t *testing.T) {

for _, ts := range []hlc.Timestamp{{Logical: 1}, {Logical: 2}, {WallTime: 1}} {
value, _, err := MVCCGet(ctx, engine, testKey1, ts, MVCCGetOptions{Txn: txn1})

if err != nil {
t.Fatal(err)
}
if !bytes.Equal(value1.RawBytes, value.RawBytes) {
t.Fatalf("the value %s in get result does not match the value %s in request",
t.Fatalf("the value %+v in get result does not match the value %+v in request",
value1.RawBytes, value.RawBytes)
}
}
Expand All @@ -412,7 +427,7 @@ func TestMVCCPutWithoutTxn(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
engine := createTestEngine()
engine := createTestPebbleEngine()
defer engine.Close()

err := MVCCPut(ctx, engine, nil, testKey1, hlc.Timestamp{Logical: 1}, value1, nil)
Expand Down Expand Up @@ -1677,13 +1692,7 @@ func TestMVCCInvalidateIterator(t *testing.T) {
}
}

func TestMVCCScan(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
engine := createTestEngine()
defer engine.Close()

func mvccScanTest(t *testing.T, ctx context.Context, engine Engine) {
if err := MVCCPut(ctx, engine, nil, testKey1, hlc.Timestamp{WallTime: 1}, value1, nil); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1772,13 +1781,33 @@ func TestMVCCScan(t *testing.T) {
}
}

func TestMVCCScanMaxNum(t *testing.T) {
func TestMVCCScan(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
engine := createTestEngine()
defer engine.Close()

mvccScanTest(t, ctx, engine)
}

func TestMVCCScanPebble(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
engine := createTestPebbleEngine()
defer engine.Close()

mvccScanTest(t, ctx, engine)
}

func TestMVCCScanMaxNum(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
engine := createTestPebbleEngine()
defer engine.Close()

if err := MVCCPut(ctx, engine, nil, testKey1, hlc.Timestamp{WallTime: 1}, value1, nil); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -5805,8 +5834,10 @@ func TestMVCCTimeSeriesPartialMerge(t *testing.T) {
}

if i == 1 {
if err := engine.(InMem).Compact(); err != nil {
t.Fatal(err)
if eng, ok := engine.(InMem); ok {
if err := eng.Compact(); err != nil {
t.Fatal(err)
}
}
}

Expand All @@ -5818,8 +5849,10 @@ func TestMVCCTimeSeriesPartialMerge(t *testing.T) {
}

if i == 1 {
if err := engine.(InMem).Compact(); err != nil {
t.Fatal(err)
if eng, ok := engine.(InMem); ok {
if err := eng.Compact(); err != nil {
t.Fatal(err)
}
}
}

Expand Down
Loading

0 comments on commit 5fd1f8d

Please sign in to comment.