Skip to content

Commit

Permalink
engine: Implement Engine, Iterator interfaces for Pebble
Browse files Browse the repository at this point in the history
This change implements the interfaces defined in
`pkg/storage/engine/engine.go` for Pebble, in particular `Engine`,
`Iterator`, and related ones. Also ports the MVCC Scanner to Go
for use in `iter.MVCC{Get,Scan}`.

Other related changes elsewhere in the codebase to let an incomplete
Engine implementer (i.e. without WithSSTables, and without a compactor)
run otherwise.

Also adds a parameter in the `--store` command line flag to specify
what engine to use. `--store=engine=pebble,path=node1` starts a Pebble
based store.

NOTE: High risk change, do not merge this for 19.2.

Fixes cockroachdb#39674.

TODO:
- [ ] Add more test coverage, especially around batches

Release note: None
  • Loading branch information
itsbilal committed Oct 1, 2019
1 parent d1f5ea4 commit 5dfdd35
Show file tree
Hide file tree
Showing 11 changed files with 2,145 additions and 151 deletions.
7 changes: 7 additions & 0 deletions pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type StoreSpec struct {
Path string
Size SizeSpec
InMemory bool
Engine EngineType
Attributes roachpb.Attributes
// UseFileRegistry is true if the "file registry" store version is desired.
// This is set by CCL code when encryption-at-rest is in use.
Expand Down Expand Up @@ -308,6 +309,12 @@ func NewStoreSpec(value string) (StoreSpec, error) {
} else {
return StoreSpec{}, fmt.Errorf("%s is not a valid store type", value)
}
case "engine":
if value == "pebble" {
ss.Engine = EngineTypePebble
} else {
ss.Engine = EngineTypeRocksDB
}
case "rocksdb":
ss.RocksDBOptions = value
default:
Expand Down
46 changes: 34 additions & 12 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/pebble"
pebbleCache "github.com/cockroachdb/pebble/cache"
"github.com/elastic/gosigar"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -479,19 +481,39 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {

details = append(details, fmt.Sprintf("store %d: RocksDB, max size %s, max open file limit %d",
i, humanizeutil.IBytes(sizeInBytes), openFileLimitPerStore))
rocksDBConfig := engine.RocksDBConfig{
Attrs: spec.Attributes,
Dir: spec.Path,
MaxSizeBytes: sizeInBytes,
MaxOpenFiles: openFileLimitPerStore,
WarnLargeBatchThreshold: 500 * time.Millisecond,
Settings: cfg.Settings,
UseFileRegistry: spec.UseFileRegistry,
RocksDBOptions: spec.RocksDBOptions,
ExtraOptions: spec.ExtraOptions,
}

eng, err := engine.NewRocksDB(rocksDBConfig, cache)
var eng engine.Engine
var err error
if spec.Engine == base.EngineTypePebble {
pebbleOpts := &pebble.Options{
Cache: pebbleCache.New(128 << 20),
MaxOpenFiles: int(openFileLimitPerStore),
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinFlushRate: 4 << 20,
L0CompactionThreshold: 2,
L0StopWritesThreshold: 400,
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: []pebble.LevelOptions{{
BlockSize: 32 << 10,
}},
}
eng, err = engine.NewPebble(spec.Path, pebbleOpts)
} else {
rocksDBConfig := engine.RocksDBConfig{
Attrs: spec.Attributes,
Dir: spec.Path,
MaxSizeBytes: sizeInBytes,
MaxOpenFiles: openFileLimitPerStore,
WarnLargeBatchThreshold: 500 * time.Millisecond,
Settings: cfg.Settings,
UseFileRegistry: spec.UseFileRegistry,
RocksDBOptions: spec.RocksDBOptions,
ExtraOptions: spec.ExtraOptions,
}

eng, err = engine.NewRocksDB(rocksDBConfig, cache)
}
if err != nil {
return Engines{}, err
}
Expand Down
133 changes: 87 additions & 46 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"sync"
"time"

Expand All @@ -23,11 +25,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/dustin/go-humanize"
"github.com/elastic/gosigar"
)

const (
Expand Down Expand Up @@ -1884,51 +1888,6 @@ func (m mvccKeyFormatter) Format(f fmt.State, c rune) {
m.key.Format(f, c)
}

// MVCCComparer is a pebble.Comparer object that implements MVCC-specific
// comparator settings for use with Pebble.
//
// TODO(itsbilal): Move this to a new file pebble.go.
var MVCCComparer = &pebble.Comparer{
Compare: MVCCKeyCompare,
AbbreviatedKey: func(k []byte) uint64 {
key, _, ok := enginepb.SplitMVCCKey(k)
if !ok {
return 0
}
return pebble.DefaultComparer.AbbreviatedKey(key)
},

Format: func(k []byte) fmt.Formatter {
decoded, err := DecodeMVCCKey(k)
if err != nil {
return mvccKeyFormatter{err: err}
}
return mvccKeyFormatter{key: decoded}
},

Separator: func(dst, a, b []byte) []byte {
return append(dst, a...)
},

Successor: func(dst, a []byte) []byte {
return append(dst, a...)
},
Split: func(k []byte) int {
if len(k) == 0 {
return len(k)
}
// This is similar to what enginepb.SplitMVCCKey does.
tsLen := int(k[len(k)-1])
keyPartEnd := len(k) - 1 - tsLen
if keyPartEnd < 0 {
return len(k)
}
return keyPartEnd
},

Name: "cockroach_comparator",
}

// MVCCMerge implements a merge operation. Merge adds integer values,
// concatenates undifferentiated byte slice values, and efficiently
// combines time series observations if the roachpb.Value tag value
Expand Down Expand Up @@ -3288,3 +3247,85 @@ func ComputeStatsGo(
ms.LastUpdateNanos = nowNanos
return ms, nil
}

// computeCapacity returns capacity details for the engine's available storage,
// by querying the underlying file system.
func computeCapacity(path string, maxSizeBytes int64) (roachpb.StoreCapacity, error) {
fileSystemUsage := gosigar.FileSystemUsage{}
dir := path
if dir == "" {
// This is an in-memory instance. Pretend we're empty since we
// don't know better and only use this for testing. Using any
// part of the actual file system here can throw off allocator
// rebalancing in a hard-to-trace manner. See #7050.
return roachpb.StoreCapacity{
Capacity: maxSizeBytes,
Available: maxSizeBytes,
}, nil
}
if err := fileSystemUsage.Get(dir); err != nil {
return roachpb.StoreCapacity{}, err
}

if fileSystemUsage.Total > math.MaxInt64 {
return roachpb.StoreCapacity{}, fmt.Errorf("unsupported disk size %s, max supported size is %s",
humanize.IBytes(fileSystemUsage.Total), humanizeutil.IBytes(math.MaxInt64))
}
if fileSystemUsage.Avail > math.MaxInt64 {
return roachpb.StoreCapacity{}, fmt.Errorf("unsupported disk size %s, max supported size is %s",
humanize.IBytes(fileSystemUsage.Avail), humanizeutil.IBytes(math.MaxInt64))
}
fsuTotal := int64(fileSystemUsage.Total)
fsuAvail := int64(fileSystemUsage.Avail)

// Find the total size of all the files in the r.dir and all its
// subdirectories.
var totalUsedBytes int64
if errOuter := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if err != nil {
// This can happen if rocksdb removes files out from under us - just keep
// going to get the best estimate we can.
if os.IsNotExist(err) {
return nil
}
// Special-case: if the store-dir is configured using the root of some fs,
// e.g. "/mnt/db", we might have special fs-created files like lost+found
// that we can't read, so just ignore them rather than crashing.
if os.IsPermission(err) && filepath.Base(path) == "lost+found" {
return nil
}
return err
}
if info.Mode().IsRegular() {
totalUsedBytes += info.Size()
}
return nil
}); errOuter != nil {
return roachpb.StoreCapacity{}, errOuter
}

// If no size limitation have been placed on the store size or if the
// limitation is greater than what's available, just return the actual
// totals.
if maxSizeBytes == 0 || maxSizeBytes >= fsuTotal || path == "" {
return roachpb.StoreCapacity{
Capacity: fsuTotal,
Available: fsuAvail,
Used: totalUsedBytes,
}, nil
}

available := maxSizeBytes - totalUsedBytes
if available > fsuAvail {
available = fsuAvail
}
if available < 0 {
available = 0
}

return roachpb.StoreCapacity{
Capacity: maxSizeBytes,
Available: available,
Used: totalUsedBytes,
}, nil
}
Loading

0 comments on commit 5dfdd35

Please sign in to comment.