From 33a7ee7b21297e6afaf9998138cce9965658a5fd Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 28 Aug 2019 19:47:17 -0400 Subject: [PATCH] engine: Implement Engine, Iterator interfaces for Pebble This change implements the interfaces defined in `pkg/storage/engine/engine.go` for Pebble, in particular `Engine`, `Iterator`, and related ones. Also ports the MVCC Scanner to Go for use in `iter.MVCC{Get,Scan}`. Other related changes elsewhere in the codebase to let an incomplete Engine implementer (i.e. without WithSSTables, and without a compactor) run otherwise. Also adds a parameter in the `--store` command line flag to specify what engine to use. `--store=engine=pebble,path=node1` starts a Pebble based store. NOTE: High risk change, do not merge this for 19.2. Fixes #39674. TODO: - [ ] Add more test coverage, especially around batches Release note: None --- pkg/base/store_spec.go | 7 + pkg/cli/debug.go | 10 +- pkg/server/config.go | 54 +- pkg/storage/engine/mvcc.go | 133 +++-- pkg/storage/engine/pebble.go | 627 ++++++++++++++++++++++ pkg/storage/engine/pebble_batch.go | 334 ++++++++++++ pkg/storage/engine/pebble_iterator.go | 467 ++++++++++++++++ pkg/storage/engine/pebble_mvcc_scanner.go | 608 +++++++++++++++++++++ pkg/storage/engine/rocksdb.go | 83 +-- pkg/storage/replica_application_result.go | 4 + pkg/storage/replica_destroy.go | 5 +- pkg/storage/store.go | 27 +- 12 files changed, 2195 insertions(+), 164 deletions(-) create mode 100644 pkg/storage/engine/pebble.go create mode 100644 pkg/storage/engine/pebble_batch.go create mode 100644 pkg/storage/engine/pebble_iterator.go create mode 100644 pkg/storage/engine/pebble_mvcc_scanner.go diff --git a/pkg/base/store_spec.go b/pkg/base/store_spec.go index f109277a8047..3eeb880f0ef4 100644 --- a/pkg/base/store_spec.go +++ b/pkg/base/store_spec.go @@ -161,6 +161,7 @@ type StoreSpec struct { Path string Size SizeSpec InMemory bool + Engine EngineType Attributes roachpb.Attributes // UseFileRegistry is true if the "file registry" store version is desired. // This is set by CCL code when encryption-at-rest is in use. @@ -308,6 +309,12 @@ func NewStoreSpec(value string) (StoreSpec, error) { } else { return StoreSpec{}, fmt.Errorf("%s is not a valid store type", value) } + case "engine": + if value == "pebble" { + ss.Engine = EngineTypePebble + } else { + ss.Engine = EngineTypeRocksDB + } case "rocksdb": ss.RocksDBOptions = value default: diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index b64834f8e1ff..95a2586827a6 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -53,7 +53,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/sysutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/tool" "github.com/gogo/protobuf/jsonpb" "github.com/kr/pretty" @@ -1332,14 +1331,7 @@ func init() { // To be able to read Cockroach-written RocksDB manifests/SSTables, comparator // and merger functions must be specified to pebble that match the ones used // to write those files. - // - // TODO(itsbilal): Port the Cockroach merger over from libroach/merge.cc to go - // and use that here. Until this happens, some data (eg. timeseries) will be - // printed incorrectly by this tool: it will be concatenated instead of being - // properly merged. - merger := *pebble.DefaultMerger - merger.Name = "cockroach_merge_operator" - pebbleTool.RegisterMerger(&merger) + pebbleTool.RegisterMerger(engine.MVCCMerger) pebbleTool.RegisterComparer(engine.MVCCComparer) debugPebbleCmd.AddCommand(pebbleTool.Commands...) DebugCmd.AddCommand(debugPebbleCmd) diff --git a/pkg/server/config.go b/pkg/server/config.go index 106b55032223..c0191ae572a1 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -34,16 +34,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/pebble" "github.com/elastic/gosigar" "github.com/pkg/errors" ) // Context defaults. const ( - // DefaultCacheSize is the default size of the RocksDB cache. We default the - // cache size and SQL memory pool size to 128 MiB. Larger values might - // provide significantly better performance, but we're not sure what type of - // system we're running on (development or production or some shared + // DefaultCacheSize is the default size of the RocksDB and Pebble caches. We + // default the cache size and SQL memory pool size to 128 MiB. Larger values + // might provide significantly better performance, but we're not sure what + // type of system we're running on (development or production or some shared // environment). Production users should almost certainly override these // settings and we'll warn in the logs about doing so. DefaultCacheSize = 128 << 20 // 128 MB @@ -479,19 +480,40 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) { details = append(details, fmt.Sprintf("store %d: RocksDB, max size %s, max open file limit %d", i, humanizeutil.IBytes(sizeInBytes), openFileLimitPerStore)) - rocksDBConfig := engine.RocksDBConfig{ - Attrs: spec.Attributes, - Dir: spec.Path, - MaxSizeBytes: sizeInBytes, - MaxOpenFiles: openFileLimitPerStore, - WarnLargeBatchThreshold: 500 * time.Millisecond, - Settings: cfg.Settings, - UseFileRegistry: spec.UseFileRegistry, - RocksDBOptions: spec.RocksDBOptions, - ExtraOptions: spec.ExtraOptions, - } - eng, err := engine.NewRocksDB(rocksDBConfig, cache) + var eng engine.Engine + var err error + if spec.Engine == base.EngineTypePebble { + // TODO(itsbilal): Tune these options, and allow them to be overridden + // in the spec (similar to the existing spec.RocksDBOptions and others). + pebbleOpts := &pebble.Options{ + MaxOpenFiles: int(openFileLimitPerStore), + MemTableSize: 64 << 20, + MemTableStopWritesThreshold: 4, + MinFlushRate: 4 << 20, + L0CompactionThreshold: 2, + L0StopWritesThreshold: 400, + LBaseMaxBytes: 64 << 20, // 64 MB + Levels: []pebble.LevelOptions{{ + BlockSize: 32 << 10, + }}, + } + eng, err = engine.NewPebble(spec.Path, pebbleOpts) + } else { + rocksDBConfig := engine.RocksDBConfig{ + Attrs: spec.Attributes, + Dir: spec.Path, + MaxSizeBytes: sizeInBytes, + MaxOpenFiles: openFileLimitPerStore, + WarnLargeBatchThreshold: 500 * time.Millisecond, + Settings: cfg.Settings, + UseFileRegistry: spec.UseFileRegistry, + RocksDBOptions: spec.RocksDBOptions, + ExtraOptions: spec.ExtraOptions, + } + + eng, err = engine.NewRocksDB(rocksDBConfig, cache) + } if err != nil { return Engines{}, err } diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 2924165c6d45..ee18782c60ad 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -15,6 +15,8 @@ import ( "context" "fmt" "math" + "os" + "path/filepath" "sync" "time" @@ -23,11 +25,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" - "github.com/cockroachdb/pebble" + "github.com/dustin/go-humanize" + "github.com/elastic/gosigar" ) const ( @@ -1884,51 +1888,6 @@ func (m mvccKeyFormatter) Format(f fmt.State, c rune) { m.key.Format(f, c) } -// MVCCComparer is a pebble.Comparer object that implements MVCC-specific -// comparator settings for use with Pebble. -// -// TODO(itsbilal): Move this to a new file pebble.go. -var MVCCComparer = &pebble.Comparer{ - Compare: MVCCKeyCompare, - AbbreviatedKey: func(k []byte) uint64 { - key, _, ok := enginepb.SplitMVCCKey(k) - if !ok { - return 0 - } - return pebble.DefaultComparer.AbbreviatedKey(key) - }, - - Format: func(k []byte) fmt.Formatter { - decoded, err := DecodeMVCCKey(k) - if err != nil { - return mvccKeyFormatter{err: err} - } - return mvccKeyFormatter{key: decoded} - }, - - Separator: func(dst, a, b []byte) []byte { - return append(dst, a...) - }, - - Successor: func(dst, a []byte) []byte { - return append(dst, a...) - }, - Split: func(k []byte) int { - if len(k) == 0 { - return len(k) - } - // This is similar to what enginepb.SplitMVCCKey does. - tsLen := int(k[len(k)-1]) - keyPartEnd := len(k) - 1 - tsLen - if keyPartEnd < 0 { - return len(k) - } - return keyPartEnd - }, - - Name: "cockroach_comparator", -} - // MVCCMerge implements a merge operation. Merge adds integer values, // concatenates undifferentiated byte slice values, and efficiently // combines time series observations if the roachpb.Value tag value @@ -3288,3 +3247,85 @@ func ComputeStatsGo( ms.LastUpdateNanos = nowNanos return ms, nil } + +// computeCapacity returns capacity details for the engine's available storage, +// by querying the underlying file system. +func computeCapacity(path string, maxSizeBytes int64) (roachpb.StoreCapacity, error) { + fileSystemUsage := gosigar.FileSystemUsage{} + dir := path + if dir == "" { + // This is an in-memory instance. Pretend we're empty since we + // don't know better and only use this for testing. Using any + // part of the actual file system here can throw off allocator + // rebalancing in a hard-to-trace manner. See #7050. + return roachpb.StoreCapacity{ + Capacity: maxSizeBytes, + Available: maxSizeBytes, + }, nil + } + if err := fileSystemUsage.Get(dir); err != nil { + return roachpb.StoreCapacity{}, err + } + + if fileSystemUsage.Total > math.MaxInt64 { + return roachpb.StoreCapacity{}, fmt.Errorf("unsupported disk size %s, max supported size is %s", + humanize.IBytes(fileSystemUsage.Total), humanizeutil.IBytes(math.MaxInt64)) + } + if fileSystemUsage.Avail > math.MaxInt64 { + return roachpb.StoreCapacity{}, fmt.Errorf("unsupported disk size %s, max supported size is %s", + humanize.IBytes(fileSystemUsage.Avail), humanizeutil.IBytes(math.MaxInt64)) + } + fsuTotal := int64(fileSystemUsage.Total) + fsuAvail := int64(fileSystemUsage.Avail) + + // Find the total size of all the files in the r.dir and all its + // subdirectories. + var totalUsedBytes int64 + if errOuter := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { + if err != nil { + // This can happen if rocksdb removes files out from under us - just keep + // going to get the best estimate we can. + if os.IsNotExist(err) { + return nil + } + // Special-case: if the store-dir is configured using the root of some fs, + // e.g. "/mnt/db", we might have special fs-created files like lost+found + // that we can't read, so just ignore them rather than crashing. + if os.IsPermission(err) && filepath.Base(path) == "lost+found" { + return nil + } + return err + } + if info.Mode().IsRegular() { + totalUsedBytes += info.Size() + } + return nil + }); errOuter != nil { + return roachpb.StoreCapacity{}, errOuter + } + + // If no size limitation have been placed on the store size or if the + // limitation is greater than what's available, just return the actual + // totals. + if maxSizeBytes == 0 || maxSizeBytes >= fsuTotal || path == "" { + return roachpb.StoreCapacity{ + Capacity: fsuTotal, + Available: fsuAvail, + Used: totalUsedBytes, + }, nil + } + + available := maxSizeBytes - totalUsedBytes + if available > fsuAvail { + available = fsuAvail + } + if available < 0 { + available = 0 + } + + return roachpb.StoreCapacity{ + Capacity: maxSizeBytes, + Available: available, + Used: totalUsedBytes, + }, nil +} diff --git a/pkg/storage/engine/pebble.go b/pkg/storage/engine/pebble.go new file mode 100644 index 000000000000..48abb2fb2d2c --- /dev/null +++ b/pkg/storage/engine/pebble.go @@ -0,0 +1,627 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package engine + +import "C" + +import ( + "context" + "fmt" + "io/ioutil" + "sync" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/cache" + "github.com/cockroachdb/pebble/vfs" + "github.com/pkg/errors" +) + +// MVCCComparer is a pebble.Comparer object that implements MVCC-specific +// comparator settings for use with Pebble. +var MVCCComparer = &pebble.Comparer{ + Compare: MVCCKeyCompare, + + AbbreviatedKey: func(k []byte) uint64 { + key, _, ok := enginepb.SplitMVCCKey(k) + if !ok { + return 0 + } + return pebble.DefaultComparer.AbbreviatedKey(key) + }, + + Format: func(k []byte) fmt.Formatter { + decoded, err := DecodeMVCCKey(k) + if err != nil { + return mvccKeyFormatter{err: err} + } + return mvccKeyFormatter{key: decoded} + }, + + Separator: func(dst, a, b []byte) []byte { + return append(dst, a...) + }, + + Successor: func(dst, a []byte) []byte { + return append(dst, a...) + }, + + Split: func(k []byte) int { + if len(k) == 0 { + return len(k) + } + // This is similar to what enginepb.SplitMVCCKey does. + tsLen := int(k[len(k)-1]) + keyPartEnd := len(k) - 1 - tsLen + if keyPartEnd < 0 { + return len(k) + } + return keyPartEnd + }, + + Name: "cockroach_comparator", +} + +// MVCCMerger is a pebble.Merger object that implements the merge operator used +// by Cockroach. +var MVCCMerger = &pebble.Merger{ + Name: "cockroach_merge_operator", + + Merge: func(key, oldValue, newValue, buf []byte) []byte { + // TODO(itsbilal): Port the merge operator from C++ to Go. + // Until then, call the C++ merge operator directly. + ret, err := goMerge(oldValue, newValue) + if err != nil { + return nil + } + return ret + }, +} + +// pebbleCache is a struct that manages a shared Pebble cache instance as well +// as does ref counts. +type pebbleCache struct { + sync.Mutex + + handle *cache.Cache + refs int +} + +var cacheSingleton pebbleCache + +// get returns a handle to a shared pebble cache. +func (p *pebbleCache) get() *cache.Cache { + p.Lock() + defer p.Unlock() + + if p.handle == nil { + // Size copied over from server.DefaultCacheSize. + // + // TODO(itsbilal): Investigate whether DefaultCacheSize works well in this + // case. + p.handle = cache.New(128 << 20) + } + + p.refs++ + return p.handle +} + +// release is called to denote a pebble cache object is no longer in use. +func (p *pebbleCache) release() { + p.Lock() + defer p.Unlock() + + p.refs-- + + if p.refs <= 0 { + // Let the handle get GCed. + p.handle = nil + } +} + +// Pebble is a wrapper around a Pebble database instance. +type Pebble struct { + db *pebble.DB + + closed bool + path string + + // Relevant options copied over from pebble.Options. + fs vfs.FS + readOnly bool +} + +var _ Engine = &Pebble{} + +// NewPebble creates a new Pebble instance, at the specified path. +func NewPebble(path string, cfg *pebble.Options) (*Pebble, error) { + cfg.Comparer = MVCCComparer + cfg.Merger = MVCCMerger + cfg.Cache = cacheSingleton.get() + + // pebble.Open also calls EnsureDefaults, but only after doing a clone. Call + // EnsureDefaults beforehand so we have a matching cfg here for when we save + // cfg.FS and cfg.ReadOnly later on. + cfg.EnsureDefaults() + + db, err := pebble.Open(path, cfg) + if err != nil { + return nil, err + } + + return &Pebble{ + db: db, + closed: false, + path: path, + fs: cfg.FS, + readOnly: cfg.ReadOnly, + }, nil +} + +// Close implements the Engine interface. +func (p *Pebble) Close() { + p.closed = true + + if p.readOnly { + // Don't close the underlying handle; the non-ReadOnly instance will handle + // that. + return + } + _ = p.db.Close() + cacheSingleton.release() +} + +// Closed implements the Engine interface. +func (p *Pebble) Closed() bool { + return p.closed +} + +// Get implements the Engine interface. +func (p *Pebble) Get(key MVCCKey) ([]byte, error) { + ret, err := p.db.Get(EncodeKey(key)) + if err == pebble.ErrNotFound || len(ret) == 0 { + return nil, nil + } + return ret, err +} + +// GetProto implements the Engine interface. +func (p *Pebble) GetProto(key MVCCKey, msg protoutil.Message) (ok bool, keyBytes, valBytes int64, err error) { + val, err := p.Get(key) + if err != nil || val == nil { + return false, 0, 0, err + } + + ok = true + err = protoutil.Unmarshal(val, msg) + keyBytes = int64(key.Len()) + valBytes = int64(len(val)) + return +} + +// Iterate implements the Engine interface. +func (p *Pebble) Iterate(start, end MVCCKey, f func(MVCCKeyValue) (stop bool, err error)) error { + if !start.Less(end) { + return nil + } + + it := newPebbleIterator(p.db, IterOptions{UpperBound: end.Key}) + defer it.Close() + + it.Seek(start) + for ; ; it.Next() { + ok, err := it.Valid() + if err != nil { + return err + } else if !ok { + break + } + + k := it.Key() + if !k.Less(end) { + break + } + if done, err := f(MVCCKeyValue{Key: k, Value: it.Value()}); done || err != nil { + return err + } + } + return nil +} + +// NewIterator implements the Engine interface. +func (p *Pebble) NewIterator(opts IterOptions) Iterator { + iter := newPebbleIterator(p.db, opts) + if iter == nil { + panic("couldn't create a new iterator") + } + return iter +} + +// ApplyBatchRepr implements the Engine interface. +func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + + batch := p.db.NewBatch() + if err := batch.SetRepr(repr); err != nil { + return err + } + + opts := pebble.NoSync + if sync { + opts = pebble.Sync + } + return batch.Commit(opts) +} + +// Clear implements the Engine interface. +func (p *Pebble) Clear(key MVCCKey) error { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + + return p.db.Delete(EncodeKey(key), pebble.Sync) +} + +// SingleClear implements the Engine interface. +func (p *Pebble) SingleClear(key MVCCKey) error { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + + return p.db.SingleDelete(EncodeKey(key), pebble.Sync) +} + +// ClearRange implements the Engine interface. +func (p *Pebble) ClearRange(start, end MVCCKey) error { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + + bufStart := EncodeKey(start) + bufEnd := EncodeKey(end) + return p.db.DeleteRange(bufStart, bufEnd, pebble.Sync) +} + +// ClearIterRange implements the Engine interface. +func (p *Pebble) ClearIterRange(iter Iterator, start, end MVCCKey) error { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + + pebbleIter, ok := iter.(*pebbleIterator) + if !ok { + return errors.Errorf("%T is not a Pebble iterator", iter) + } + pebbleIter.Seek(start) + for ; ; pebbleIter.Next() { + ok, err := pebbleIter.Valid() + if err != nil { + return err + } else if !ok { + break + } + + err = p.db.Delete(pebbleIter.iter.Key(), pebble.Sync) + if err != nil { + return err + } + } + + return nil +} + +// Merge implements the Engine interface. +func (p *Pebble) Merge(key MVCCKey, value []byte) error { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + + return p.db.Merge(EncodeKey(key), value, pebble.Sync) +} + +// Put implements the Engine interface. +func (p *Pebble) Put(key MVCCKey, value []byte) error { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + + return p.db.Set(EncodeKey(key), value, pebble.Sync) +} + +// LogData implements the Engine interface. +func (p *Pebble) LogData(data []byte) error { + return p.db.LogData(data, pebble.Sync) +} + +// LogLogicalOp implements the Engine interface. +func (p *Pebble) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) { + // No-op. Logical logging disabled. +} + +// Attrs implements the Engine interface. +func (p *Pebble) Attrs() roachpb.Attributes { + // TODO(itsbilal): Implement this. + return roachpb.Attributes{} +} + +// Capacity implements the Engine interface. +func (p *Pebble) Capacity() (roachpb.StoreCapacity, error) { + // Pebble doesn't have a capacity limiting parameter, so pass 0 for + // maxSizeBytes to denote no limit. + return computeCapacity(p.path, 0) +} + +// Flush implements the Engine interface. +func (p *Pebble) Flush() error { + return p.db.Flush() +} + +// GetStats implements the Engine interface. +func (p *Pebble) GetStats() (*Stats, error) { + // TODO(itsbilal): Implement this. + return &Stats{}, nil +} + +// GetEnvStats implements the Engine interface. +func (p *Pebble) GetEnvStats() (*EnvStats, error) { + // TODO(itsbilal): Implement this. + return &EnvStats{}, nil +} + +// GetAuxiliaryDir implements the Engine interface. +func (p *Pebble) GetAuxiliaryDir() string { + // Suggest an auxiliary subdirectory within the pebble data path. + return p.fs.PathJoin(p.path, "auxiliary") +} + +// NewBatch implements the Engine interface. +func (p *Pebble) NewBatch() Batch { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + return newPebbleBatch(p.db.NewIndexedBatch()) +} + +// NewReadOnly implements the Engine interface. +func (p *Pebble) NewReadOnly() ReadWriter { + peb := &Pebble{ + db: p.db, + closed: false, + path: p.path, + readOnly: true, + fs: p.fs, + } + return peb +} + +// NewWriteOnlyBatch implements the Engine interface. +func (p *Pebble) NewWriteOnlyBatch() Batch { + if p.readOnly { + panic("write operation called on read-only pebble instance") + } + return newPebbleBatch(p.db.NewBatch()) +} + +// NewSnapshot implements the Engine interface. +func (p *Pebble) NewSnapshot() Reader { + return &pebbleSnapshot{ + snapshot: p.db.NewSnapshot(), + } +} + +// IngestExternalFiles implements the Engine interface. +func (p *Pebble) IngestExternalFiles(ctx context.Context, paths []string, skipWritingSeqNo, allowFileModifications bool) error { + return p.db.Ingest(paths) +} + +// PreIngestDelay implements the Engine interface. +func (p *Pebble) PreIngestDelay(_ context.Context) { + // This is a RocksDB-ism. Pebble takes care of any ingestion-induced waits. + return +} + +// ApproximateDiskBytes implements the Engine interface. +func (p *Pebble) ApproximateDiskBytes(from, to roachpb.Key) (uint64, error) { + count := uint64(0) + _ = p.Iterate(MVCCKey{from, hlc.Timestamp{}}, MVCCKey{to, hlc.Timestamp{}}, func (kv MVCCKeyValue) (bool, error) { + count += uint64(kv.Key.Len() + len(kv.Value)) + return false, nil + }) + return count, nil +} + +// CompactRange implements the Engine interface. +func (p *Pebble) CompactRange(start, end roachpb.Key, forceBottommost bool) error { + bufStart := EncodeKey(MVCCKey{start, hlc.Timestamp{}}) + bufEnd := EncodeKey(MVCCKey{end, hlc.Timestamp{}}) + return p.db.Compact(bufStart, bufEnd) +} + +// OpenFile implements the Engine interface. +func (p *Pebble) OpenFile(filename string) (DBFile, error) { + file, err := p.fs.Open(p.fs.PathJoin(p.path, filename)) + if err != nil { + return nil, err + } + + pebbleFile := &pebbleFile{ + file: file, + } + return pebbleFile, nil +} + +// ReadFile implements the Engine interface. +func (p *Pebble) ReadFile(filename string) ([]byte, error) { + file, err := p.fs.Open(p.fs.PathJoin(p.path, filename)) + if err != nil { + return nil, err + } + defer file.Close() + + return ioutil.ReadAll(file) +} + +// DeleteFile implements the Engine interface. +func (p *Pebble) DeleteFile(filename string) error { + return p.fs.Remove(filename) +} + +// DeleteDirAndFiles implements the Engine interface. +func (p *Pebble) DeleteDirAndFiles(dir string) error { + files, err := p.fs.List(dir) + if err != nil { + return err + } + + // Recurse through all files, calling DeleteFile or DeleteDirAndFiles as + // appropriate. + for _, filename := range files { + path := p.fs.PathJoin(dir, filename) + stat, err := p.fs.Stat(path); + if err != nil { + return err + } + + if stat.IsDir() { + err = p.DeleteDirAndFiles(path) + } else { + err = p.DeleteFile(path) + } + if err != nil { + return err + } + } + return nil +} + +// LinkFile implements the Engine interface. +func (p *Pebble) LinkFile(oldname, newname string) error { + oldPath := p.fs.PathJoin(p.path, oldname) + newPath := p.fs.PathJoin(p.path, newname) + return p.fs.Link(oldPath, newPath) +} + +// CreateCheckpoint implements the Engine interface. +func (p *Pebble) CreateCheckpoint(_ string) error { + // No-op for now: Pebble does not implement checkpoints. + // + // TODO(itsbilal): Update this when Pebble implements checkpoints: + // https://github.com/cockroachdb/pebble/issues/304 + return nil +} + +// pebbleFile wraps a pebble File and implements the DBFile interface. +type pebbleFile struct { + file vfs.File +} + +var _ DBFile = &pebbleFile{} + +// Append implements the DBFile interface. +func (p *pebbleFile) Append(data []byte) error { + _, err := p.file.Write(data) + return err +} + +// Close implements the DBFile interface. +func (p *pebbleFile) Close() error { + return p.file.Close() +} + +// Close implements the DBFile interface. +func (p *pebbleFile) Sync() error { + return p.file.Sync() +} + +// pebbleSnapshot represents a snapshot created using Pebble.NewSnapshot(). +type pebbleSnapshot struct { + snapshot *pebble.Snapshot + closed bool +} + +var _ Reader = &pebbleSnapshot{} + +// Close implements the Reader interface. +func (p *pebbleSnapshot) Close() { + p.snapshot.Close() + p.closed = true +} + +// Closed implements the Reader interface. +func (p *pebbleSnapshot) Closed() bool { + return p.closed +} + +// Get implements the Reader interface. +func (p *pebbleSnapshot) Get(key MVCCKey) ([]byte, error) { + ret, err := p.snapshot.Get(EncodeKey(key)) + if err == pebble.ErrNotFound || len(ret) == 0 { + return nil, nil + } + return ret, err +} + +// GetProto implements the Reader interface. +func (p *pebbleSnapshot) GetProto(key MVCCKey, msg protoutil.Message) (ok bool, keyBytes, valBytes int64, err error) { + var val []byte + val, err = p.snapshot.Get(EncodeKey(key)) + if err != nil || val == nil { + return + } + + ok = true + err = protoutil.Unmarshal(val, msg) + keyBytes = int64(key.Len()) + valBytes = int64(len(val)) + return +} + +// Iterate implements the Reader interface. +func (p *pebbleSnapshot) Iterate(start, end MVCCKey, f func(MVCCKeyValue) (stop bool, err error)) error { + if p.closed { + return errors.New("cannot call Iterate on a closed batch") + } + if !start.Less(end) { + return nil + } + + it := p.NewIterator(IterOptions{UpperBound: end.Key}) + defer it.Close() + + it.Seek(start) + for ; ; it.Next() { + ok, err := it.Valid() + if err != nil { + return err + } else if !ok { + break + } + + k := it.Key() + if !k.Less(end) { + break + } + if done, err := f(MVCCKeyValue{Key: k, Value: it.Value()}); done || err != nil { + return err + } + } + return nil +} + +// NewIterator implements the Reader interface. +func (p pebbleSnapshot) NewIterator(opts IterOptions) Iterator { + return newPebbleIterator(p.snapshot, opts) +} diff --git a/pkg/storage/engine/pebble_batch.go b/pkg/storage/engine/pebble_batch.go new file mode 100644 index 000000000000..15e79b834b58 --- /dev/null +++ b/pkg/storage/engine/pebble_batch.go @@ -0,0 +1,334 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package engine + +import ( + "sync" + + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" +) + +// Wrapper struct around a pebble.Batch. +type pebbleBatch struct { + batch *pebble.Batch + buf []byte + iter pebbleBatchIterator + closed bool + isDistinct bool + distinctOpen bool + parentBatch *pebbleBatch +} + +var _ Batch = &pebbleBatch{} + +var pebbleBatchPool = sync.Pool{ + New: func() interface{} { + return &pebbleBatch{} + }, +} + +// Instantiates a new pebbleBatch. +func newPebbleBatch(batch *pebble.Batch) *pebbleBatch { + pb := pebbleBatchPool.Get().(*pebbleBatch) + *pb = pebbleBatch{ + batch: batch, + closed: false, + } + + return pb +} + +// Close implements the Batch interface. +func (p *pebbleBatch) Close() { + if p.iter.iter != nil { + p.iter.iter.Close() + p.iter.destroy() + } + if !p.isDistinct { + p.batch.Close() + p.batch = nil + } else { + p.parentBatch.distinctOpen = false + p.isDistinct = false + } + p.closed = true + pebbleBatchPool.Put(p) +} + +// Closed implements the Batch interface. +func (p *pebbleBatch) Closed() bool { + return p.closed +} + +// Get implements the Batch interface. +func (p *pebbleBatch) Get(key MVCCKey) ([]byte, error) { + p.buf = EncodeKeyToBuf(p.buf[:0], key) + ret, err := p.batch.Get(p.buf) + if err == pebble.ErrNotFound || len(ret) == 0 { + return nil, nil + } + return ret, err +} + +// GetProto implements the Batch interface. +func (p *pebbleBatch) GetProto(key MVCCKey, msg protoutil.Message) (ok bool, keyBytes, valBytes int64, err error) { + p.buf = EncodeKeyToBuf(p.buf[:0], key) + val, err := p.batch.Get(p.buf) + if err != nil || val == nil { + return false, 0, 0, err + } + + ok = true + err = protoutil.Unmarshal(val, msg) + keyBytes = int64(len(p.buf)) + valBytes = int64(len(val)) + return +} + +// Iterate implements the Batch interface. +func (p *pebbleBatch) Iterate(start, end MVCCKey, f func(MVCCKeyValue) (stop bool, err error)) error { + if p.closed { + return errors.New("cannot call Iterate on a closed batch") + } + + if !start.Less(end) { + return nil + } + + it := p.NewIterator(IterOptions{UpperBound: end.Key}) + defer it.Close() + + it.Seek(start) + for ; ; it.Next() { + ok, err := it.Valid() + if err != nil { + return err + } else if !ok { + break + } + + k := it.Key() + if !k.Less(end) { + break + } + if done, err := f(MVCCKeyValue{Key: k, Value: it.Value()}); done || err != nil { + return err + } + } + return nil +} + +// NewIterator implements the Batch interface. +func (p *pebbleBatch) NewIterator(opts IterOptions) Iterator { + if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 { + panic("iterator must set prefix or upper bound or lower bound") + } + + // Use the cached iterator. + // + // TODO(itsbilal): Investigate if it's equally or more efficient to just call + // newPebbleIterator with p.batch as the handle, instead of caching an + // iterator in pebbleBatch. This would clean up some of the oddities around + // pebbleBatchIterator.Close() (which doesn't close the underlying pebble + // Iterator), vs pebbleIterator.Close(), and the way memory is managed for + // the two iterators. + if p.iter.batch != nil { + panic("iterator already in use") + } else if p.iter.iter != nil { + p.iter.iter.Close() + } + + p.iter.init(p.batch, opts) + p.iter.batch = p + return &p.iter +} + +// NewIterator implements the Batch interface. +func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error { + if p.distinctOpen { + panic("distinct batch open") + } + + var batch pebble.Batch + if err := batch.SetRepr(repr); err != nil { + return err + } + + return p.batch.Apply(&batch, nil) +} + +// Clear implements the Batch interface. +func (p *pebbleBatch) Clear(key MVCCKey) error { + if p.distinctOpen { + panic("distinct batch open") + } + + p.buf = EncodeKeyToBuf(p.buf[:0], key) + return p.batch.Delete(p.buf, nil) +} + +// SingleClear implements the Batch interface. +func (p *pebbleBatch) SingleClear(key MVCCKey) error { + if p.distinctOpen { + panic("distinct batch open") + } + + p.buf = EncodeKeyToBuf(p.buf[:0], key) + return p.batch.SingleDelete(p.buf, nil) +} + +// ClearRange implements the Batch interface. +func (p *pebbleBatch) ClearRange(start, end MVCCKey) error { + if p.distinctOpen { + panic("distinct batch open") + } + + p.buf = EncodeKeyToBuf(p.buf[:0], start) + buf2 := EncodeKey(end) + return p.batch.DeleteRange(p.buf, buf2, nil) +} + +// Clear implements the Batch interface. +func (p *pebbleBatch) ClearIterRange(iter Iterator, start, end MVCCKey) error { + if p.distinctOpen { + panic("distinct batch open") + } + + iter.SetUpperBound(end.Key) + iter.Seek(start) + + for ; ; iter.Next() { + valid, err := iter.Valid() + if err != nil { + return err + } else if !valid { + break + } + + p.buf = EncodeKeyToBuf(p.buf[:0], iter.Key()) + err = p.batch.Delete(p.buf, nil) + if err != nil { + return err + } + } + return nil +} + +// Merge implements the Batch interface. +func (p *pebbleBatch) Merge(key MVCCKey, value []byte) error { + if p.distinctOpen { + panic("distinct batch open") + } + + p.buf = EncodeKeyToBuf(p.buf[:0], key) + return p.batch.Merge(p.buf, value, nil) +} + +// Put implements the Batch interface. +func (p *pebbleBatch) Put(key MVCCKey, value []byte) error { + if p.distinctOpen { + panic("distinct batch open") + } + + p.buf = EncodeKeyToBuf(p.buf[:0], key) + return p.batch.Set(p.buf, value, nil) +} + +// LogData implements the Batch interface. +func (p *pebbleBatch) LogData(data []byte) error { + return p.batch.LogData(data, nil) +} + +func (p *pebbleBatch) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails) { + // No-op. +} + +// Commit implements the Batch interface. +func (p *pebbleBatch) Commit(sync bool) error { + opts := pebble.NoSync + if sync { + opts = pebble.Sync + } + if (p.batch == nil) { + panic("called with nil batch") + } + err := p.batch.Commit(opts) + if err != nil { + panic(err) + } + return err +} + +// Distinct implements the Batch interface. +func (p *pebbleBatch) Distinct() ReadWriter { + // Distinct batches are regular batches with isDistinct set to true. + // The parent batch is stored in parentBatch, and all writes on it are + // disallowed while the distinct batch is open. Both the distinct batch and + // the parent batch share the same underlying pebble.Batch instance. + // + // TODO(itsbilal): Investigate if we need to distinguish between distinct + // and non-distinct batches. + batch := &pebbleBatch{} + batch.batch = p.batch + batch.isDistinct = true + p.distinctOpen = true + batch.parentBatch = p + + return batch +} + +// Empty implements the Batch interface. +func (p *pebbleBatch) Empty() bool { + return p.batch.Count() == 0 +} + +// Len implements the Batch interface. +func (p *pebbleBatch) Len() int { + return len(p.batch.Repr()) +} + +// Repr implements the Batch interface. +func (p *pebbleBatch) Repr() []byte { + return p.batch.Repr() +} + +// pebbleBatchIterator extends pebbleIterator and is meant to be embedded inside +// a pebbleBatch. +type pebbleBatchIterator struct { + pebbleIterator + batch *pebbleBatch +} + +// Close implements the Iterator interface. There are two notable differences +// from pebbleIterator.Close: 1. don't close the underlying p.iter (this is done +// when the batch is closed), and 2. don't release the pebbleIterator back into +// pebbleIterPool, since this memory is managed by pebbleBatch instead. +func (p *pebbleBatchIterator) Close() { + if p.batch == nil { + panic("closing idle iterator") + } + p.batch = nil +} + +// destory resets all fields in a pebbleBatchIterator, while holding onto +// some buffers to reduce allocations down the line. Assumes the underlying +// pebble.Iterator has been closed already. +func (p *pebbleBatchIterator) destroy() { + *p = pebbleBatchIterator{ + pebbleIterator: pebbleIterator{ + lowerBoundBuf: p.lowerBoundBuf, + upperBoundBuf: p.upperBoundBuf, + }, + batch: nil, + } +} diff --git a/pkg/storage/engine/pebble_iterator.go b/pkg/storage/engine/pebble_iterator.go new file mode 100644 index 000000000000..453fd52a2328 --- /dev/null +++ b/pkg/storage/engine/pebble_iterator.go @@ -0,0 +1,467 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package engine + +import ( + "bytes" + "sync" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" + "golang.org/x/tools/container/intsets" +) + +// pebbleIterator is a wrapper around a pebble.Iterator that implements the +// Iterator interface. +type pebbleIterator struct { + // DB handle. + parent *Pebble + // Underlying iterator for the DB. + iter *pebble.Iterator + options pebble.IterOptions + // Reusable buffer for MVCC key encoding. + keyBuf []byte + // Buffers for copying iterator bounds to. Note that the underlying memory + // is not GCed upon Close(), to reduce the number of overall allocations. + lowerBoundBuf []byte + upperBoundBuf []byte + // Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips + // SSTables based on MVCC key when true. + prefix bool + // MVCC Scanner struct to store variables for one call to MVCCGet / MVCCScan. + mvccScanner pebbleMVCCScanner +} + +var _ Iterator = &pebbleIterator{} + +var pebbleIterPool = sync.Pool{ + New: func() interface{} { + return &pebbleIterator{} + }, +} + +// Instantiates a new Pebble iterator, or gets one from the pool. +func newPebbleIterator(handle pebble.Reader, opts IterOptions) Iterator { + iter := pebbleIterPool.Get().(*pebbleIterator) + iter.init(handle, opts) + return iter +} + +// init resets this pebbleIterator for use with the specified arguments. The +// current instance could either be a cached iterator (eg. in pebbleBatch), or +// a newly-instantiated one through newPebbleIterator. +func (p *pebbleIterator) init(handle pebble.Reader, opts IterOptions) { + *p = pebbleIterator{ + lowerBoundBuf: p.lowerBoundBuf, + upperBoundBuf: p.upperBoundBuf, + prefix: opts.Prefix, + } + + if !opts.Prefix && len(opts.UpperBound) == 0 && len(opts.LowerBound) == 0 { + panic("iterator must set prefix or upper bound or lower bound") + } + + if opts.LowerBound != nil { + // This is the same as + // p.options.LowerBound = EncodeKeyToBuf(p.lowerBoundBuf[:0], MVCCKey{Key: opts.LowerBound}) . + // Since we are encoding zero-timestamp MVCC Keys anyway, we can just append + // the null byte instead of calling EncodeKey which will do the same thing. + p.lowerBoundBuf = append(p.lowerBoundBuf[:0], opts.LowerBound...) + p.options.LowerBound = append(p.lowerBoundBuf, 0x00) + } + if opts.UpperBound != nil { + // Same as above. + p.upperBoundBuf = append(p.upperBoundBuf[:0], opts.UpperBound...) + p.options.UpperBound = append(p.upperBoundBuf, 0x00) + } + + p.iter = handle.NewIter(&p.options) + if p.iter == nil { + panic("unable to create iterator") + } +} + +// Close implements the Iterator interface. +func (p *pebbleIterator) Close() { + if p.iter != nil { + err := p.iter.Close() + if err != nil { + panic(err) + } + p.iter = nil + } + // Reset all fields except for the lower/upper bound buffers. Holding onto + // their underlying memory is more efficient to prevent extra allocations + // down the line. + *p = pebbleIterator{ + lowerBoundBuf: p.lowerBoundBuf, + upperBoundBuf: p.upperBoundBuf, + } + + pebbleIterPool.Put(p) +} + +// Seek implements the Iterator interface. +func (p *pebbleIterator) Seek(key MVCCKey) { + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], key) + if (p.prefix) { + p.iter.SeekPrefixGE(p.keyBuf) + } else { + p.iter.SeekGE(p.keyBuf) + } +} + +// Valid implements the Iterator interface. +func (p *pebbleIterator) Valid() (bool, error) { + return p.iter.Valid(), p.iter.Error() +} + +// Next implements the Iterator interface. +func (p *pebbleIterator) Next() { + p.iter.Next() +} + +// NextKey implements the Iterator interface. +func (p *pebbleIterator) NextKey() { + if valid, err := p.Valid(); err != nil || !valid { + return + } + p.keyBuf = append(p.keyBuf[:0], p.UnsafeKey().Key...) + + for p.iter.Next() { + if nextKey := p.UnsafeKey(); !bytes.Equal(p.keyBuf, nextKey.Key) { + break + } + } +} + +// UnsafeKey implements the Iterator interface. +func (p *pebbleIterator) UnsafeKey() MVCCKey { + if valid, err := p.Valid(); err != nil || !valid { + return MVCCKey{} + } + + mvccKey, err := DecodeMVCCKey(p.iter.Key()) + if err != nil { + return MVCCKey{} + } + + return mvccKey +} + +// UnsafeValue implements the Iterator interface. +func (p *pebbleIterator) UnsafeValue() []byte { + if valid, err := p.Valid(); err != nil || !valid { + return nil + } + return p.iter.Value() +} + +// SeekReverse implements the Iterator interface. +func (p *pebbleIterator) SeekReverse(key MVCCKey) { + // Do a SeekGE, not a SeekLT. This is because SeekReverse seeks to the + // greatest key that's less than or equal to the specified key. + p.Seek(key) + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], key) + + // The new key could either be greater or equal to the supplied key. + // Backtrack one step if it is greater. + comp := MVCCKeyCompare(p.keyBuf, p.iter.Key()) + if comp < 0 && p.iter.Valid() { + p.Prev() + } +} + +// Prev implements the Iterator interface. +func (p *pebbleIterator) Prev() { + p.iter.Prev() +} + +// PrevKey implements the Iterator interface. +func (p *pebbleIterator) PrevKey() { + if valid, err := p.Valid(); err != nil || !valid { + return + } + curKey := p.Key() + for p.iter.Prev() { + if nextKey := p.UnsafeKey(); !bytes.Equal(curKey.Key, nextKey.Key) { + break + } + } +} + +// Key implements the Iterator interface. +func (p *pebbleIterator) Key() MVCCKey { + key := p.UnsafeKey() + keyCopy := make([]byte, len(key.Key)) + copy(keyCopy, key.Key) + key.Key = keyCopy + return key +} + +// Value implements the Iterator interface. +func (p *pebbleIterator) Value() []byte { + value := p.UnsafeValue() + valueCopy := make([]byte, len(value)) + copy(valueCopy, value) + return valueCopy +} + +// ValueProto implements the Iterator interface. +func (p *pebbleIterator) ValueProto(msg protoutil.Message) error { + value := p.UnsafeValue() + + return msg.Unmarshal(value) +} + +// ComputeStats implements the Iterator interface. +func (p *pebbleIterator) ComputeStats(start, end MVCCKey, nowNanos int64) (enginepb.MVCCStats, error) { + return ComputeStatsGo(p, start, end, nowNanos) +} + +// Go-only version of IsValidSplitKey. Checks if the specified key is in +// NoSplitSpans. +func isValidSplitKey(key roachpb.Key) bool { + for _, noSplitSpan := range keys.NoSplitSpans { + if noSplitSpan.ContainsKey(key) { + return false + } + } + return true +} + +// FindSplitKey implements the Iterator interface. +func (p *pebbleIterator) FindSplitKey(start, end, minSplitKey MVCCKey, targetSize int64) (MVCCKey, error) { + const ( + timestampLen = int64(12) + ) + p.Seek(start) + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], end) + minSplitKeyBuf := EncodeKey(minSplitKey) + prevKey := make([]byte, 0) + + sizeSoFar := int64(0) + bestDiff := int64(intsets.MaxInt) + bestSplitKey := MVCCKey{} + + for ;p.iter.Valid() && MVCCKeyCompare(p.iter.Key(), p.keyBuf) < 0; p.iter.Next() { + mvccKey, err := DecodeMVCCKey(p.iter.Key()) + if err != nil { + return MVCCKey{}, err + } + + diff := targetSize - sizeSoFar + if diff < 0 { + diff = -diff + } + if diff < bestDiff && MVCCKeyCompare(p.iter.Key(), minSplitKeyBuf) >= 0 && isValidSplitKey(mvccKey.Key) { + // We are going to have to copy bestSplitKey, since by the time we find + // out it's the actual best split key, the underlying slice would have + // changed (due to the iter.Next() call). + bestDiff = diff + bestSplitKey.Key = append(bestSplitKey.Key[:0], mvccKey.Key...) + bestSplitKey.Timestamp = mvccKey.Timestamp + } + if diff > bestDiff && bestSplitKey.Key != nil { + break + } + + sizeSoFar += int64(len(p.iter.Value())) + if mvccKey.IsValue() && bytes.Equal(prevKey, mvccKey.Key) { + // We only advanced timestamps, but not new mvcc keys. + sizeSoFar += timestampLen + } else { + sizeSoFar += int64(len(mvccKey.Key) + 1) + if mvccKey.IsValue() { + sizeSoFar += timestampLen + } + } + + prevKey = append(prevKey[:0], mvccKey.Key...) + } + + return bestSplitKey, nil +} + + +// MVCCGet implements the Iterator interface. +func (p *pebbleIterator) MVCCGet( + key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, +) (value *roachpb.Value, intent *roachpb.Intent, err error) { + if opts.Inconsistent && opts.Txn != nil { + return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") + } + if len(key) == 0 { + return nil, nil, emptyKeyError() + } + if p.iter == nil { + panic("uninitialized iterator") + } + + // MVCCGet is implemented as an MVCCScan with an end key that sorts after the + // start key. + keyEnd := make([]byte, 0, len(key) + 1) + keyEnd = append(keyEnd, key...) + keyEnd = append(keyEnd, 0x00) + + p.mvccScanner = pebbleMVCCScanner{ + parent: p.iter, + start: key, + end: keyEnd, + ts: timestamp, + maxKeys: 1, + inconsistent: opts.Inconsistent, + tombstones: opts.Tombstones, + ignoreSeq: opts.IgnoreSequence, + } + + if opts.Txn != nil { + p.mvccScanner.txn = opts.Txn + p.mvccScanner.checkUncertainty = timestamp.Less(opts.Txn.MaxTimestamp) + } + + p.mvccScanner.init() + p.mvccScanner.get() + + // Init calls SetBounds. Reset it to what this iterator had at the start. + defer func() { + if p.iter != nil { + p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound) + } + }() + + if p.mvccScanner.err != nil { + return nil, nil, p.mvccScanner.err + } + intents, err := buildScanIntents(p.mvccScanner.intents.Repr()) + if err != nil { + return nil, nil, err + } + if !opts.Inconsistent && len(intents) > 0 { + return nil, nil, &roachpb.WriteIntentError{Intents: intents} + } + + if len(intents) > 1 { + return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents)) + } else if len(intents) == 1 { + intent = &intents[0] + } + + if len(p.mvccScanner.results.repr) == 0 { + return nil, intent, nil + } + + mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(p.mvccScanner.results.repr) + if err != nil { + return nil, nil, err + } + + value = &roachpb.Value{ + RawBytes: rawValue, + Timestamp: mvccKey.Timestamp, + } + return +} + +// MVCCScan implements the Iterator interface. +func (p *pebbleIterator) MVCCScan( + start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions, +) (kvData []byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { + if opts.Inconsistent && opts.Txn != nil { + return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction") + } + if len(end) == 0 { + return nil, 0, nil, nil, emptyKeyError() + } + if max == 0 { + resumeSpan = &roachpb.Span{Key: start, EndKey: end} + return nil, 0, resumeSpan, nil, nil + } + if p.iter == nil { + panic("uninitialized iterator") + } + + p.mvccScanner = pebbleMVCCScanner{ + parent: p.iter, + reverse: opts.Reverse, + start: start, + end: end, + ts: timestamp, + maxKeys: max, + inconsistent: opts.Inconsistent, + tombstones: opts.Tombstones, + ignoreSeq: opts.IgnoreSequence, + } + + if opts.Txn != nil { + p.mvccScanner.txn = opts.Txn + p.mvccScanner.checkUncertainty = timestamp.Less(opts.Txn.MaxTimestamp) + } + + p.mvccScanner.init() + p.mvccScanner.scan() + + // Init calls SetBounds. Reset it to what this iterator had at the start. + defer func() { + if p.iter != nil { + p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound) + } + }() + + if p.mvccScanner.err != nil { + return nil, 0, nil, nil, p.mvccScanner.err + } + + kvData = p.mvccScanner.results.repr + numKVs = p.mvccScanner.results.count + + if p.mvccScanner.curKey != nil { + if opts.Reverse { + resumeSpan = &roachpb.Span{ + Key: p.mvccScanner.start, + EndKey: p.mvccScanner.curKey, + } + resumeSpan.EndKey = resumeSpan.EndKey.Next() + } else { + resumeSpan = &roachpb.Span{ + Key: p.mvccScanner.curKey, + EndKey: p.mvccScanner.end, + } + } + } + intents, err = buildScanIntents(p.mvccScanner.intents.Repr()) + if err != nil { + return nil, 0, nil, nil, err + } + + if !opts.Inconsistent && len(intents) > 0 { + return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents} + } + return +} + +// SetUpperBound implements the Iterator interface. +func (p *pebbleIterator) SetUpperBound(upperBound roachpb.Key) { + p.upperBoundBuf = append(p.upperBoundBuf[:0], upperBound...) + p.options.UpperBound = append(p.upperBoundBuf, 0x00) + p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound) +} + +// Stats implements the Iterator interface. +func (p *pebbleIterator) Stats() IteratorStats { + // TODO(itsbilal): Implement this. + panic("implement me") +} diff --git a/pkg/storage/engine/pebble_mvcc_scanner.go b/pkg/storage/engine/pebble_mvcc_scanner.go new file mode 100644 index 000000000000..a2710f3e71b6 --- /dev/null +++ b/pkg/storage/engine/pebble_mvcc_scanner.go @@ -0,0 +1,608 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package engine + +import ( + "bytes" + "encoding/binary" + "sort" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" +) + +const ( + maxItersBeforeSeek = 10 +) + +// Struct to store MVCCScan / MVCCGet in the same binary format as that +// expected by MVCCScanDecodeKeyValue. +type pebbleResults struct { + count int64 + repr []byte +} + +// The repr that MVCCScan / MVCCGet expects to provide as output goes: +// +// This function adds to repr in that format. +func (p *pebbleResults) put(key []byte, value []byte) { + // Key value lengths take up 8 bytes (2 x Uint32). + const kvLenSize = 8 + + startIdx := len(p.repr) + lenToAdd := kvLenSize + len(key) + len(value) + + if len(p.repr) + lenToAdd <= cap(p.repr) { + p.repr = p.repr[:len(p.repr)+lenToAdd] + } else { + oldRepr := p.repr + p.repr = make([]byte, len(oldRepr)+lenToAdd) + copy(p.repr, oldRepr) + } + + binary.LittleEndian.PutUint32(p.repr[startIdx:], uint32(len(value))) + binary.LittleEndian.PutUint32(p.repr[startIdx+4:], uint32(len(key))) + copy(p.repr[startIdx+kvLenSize:], key) + copy(p.repr[startIdx+kvLenSize+len(key):], value) + p.count++ +} + +// Go port of mvccScanner in libroach/mvcc.h. Stores all variables relating to +// one MVCCGet / MVCCScan call. +type pebbleMVCCScanner struct { + parent *pebble.Iterator + reverse bool + // Iteration bounds. Does not contain MVCC timestamp. + start, end roachpb.Key + // Buffers for encoding iteration bounds. + startBuf, endBuf []byte + // Timestamp with which MVCCSCan/MVCCGet was called. + ts hlc.Timestamp + // Max number of keys to return. + maxKeys int64 + // Reference to transaction record, could be nil. + txn *roachpb.Transaction + // Metadata object for unmarshalling intents. + meta enginepb.MVCCMetadata + inconsistent, tombstones bool + ignoreSeq, checkUncertainty bool + keyBuf, savedBuf []byte + // cur* variables store the "current" record we're pointing to. Updated in + // updateCurrent. Note that curRawKey = the full encoded MVCC key, while + // curKey = the user-key part of curRawKey (i.e. excluding the timestamp). + curRawKey, curKey, curValue []byte + curTS hlc.Timestamp + results pebbleResults + intents pebble.Batch + // Stores any error returned. If non-nil, iteration short circuits. + err error + // Number of iterations to try before we do a Seek/SeekReverse. Stays within + // [1, maxItersBeforeSeek] and defaults to maxItersBeforeSeek/2 . + itersBeforeSeek int +} + +// init sets bounds on the underlying pebble iterator, and initializes other +// fields not set by the calling method. +func (p *pebbleMVCCScanner) init() { + p.itersBeforeSeek = maxItersBeforeSeek / 2 + + mvccStartKey := MVCCKey{p.start, hlc.Timestamp{}} + mvccEndKey := MVCCKey{p.end, hlc.Timestamp{}} + p.startBuf = EncodeKeyToBuf(p.startBuf[:0], mvccStartKey) + p.endBuf = EncodeKeyToBuf(p.endBuf[:0], mvccEndKey) + p.parent.SetBounds(p.startBuf, p.endBuf) +} + +// seekReverse seeks to the latest revision of the key before the specified key. +func (p *pebbleMVCCScanner) seekReverse(key roachpb.Key) { + mvccKey := MVCCKey{key, hlc.Timestamp{}} + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], mvccKey) + p.parent.SeekGE(p.keyBuf) + + if !p.parent.Valid() { + // We might be past the end. Seek to the end key. + p.parent.SeekLT(p.endBuf) + } + + if p.parent.Valid() && MVCCKeyCompare(p.parent.Key(), p.keyBuf) >= 0 { + p.parent.Prev() + } + + if !p.parent.Valid() { + return + } + + mvccKey, err := DecodeMVCCKey(p.parent.Key()) + if err != nil { + p.err = nil + return + } + // Seek to the earliest revision of mvccKey. + mvccKey.Timestamp = hlc.Timestamp{} + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], mvccKey) + p.parent.SeekGE(p.keyBuf) + p.updateCurrent() +} + +// seek seeks to the latest revision of the specified key (or a greater key). +func (p *pebbleMVCCScanner) seek(key roachpb.Key) { + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], MVCCKey{key, hlc.Timestamp{}}) + p.parent.SeekGE(p.keyBuf) + p.updateCurrent() +} + +// scan iterates until maxKeys records are in results, or the underlying +// iterator is exhausted, or an error is encountered. +func (p *pebbleMVCCScanner) scan() { + if p.reverse { + p.seekReverse(p.end) + } else { + p.seek(p.start) + } + + for p.results.count < p.maxKeys && p.getAndAdvance() { + } + + if p.results.count < p.maxKeys || !p.parent.Valid() { + // Either the iterator was exhausted or an error was encountered. This + // means there's no point in having a resumeSpan. Set all current variables + // to their zero values so the caller doesn't create a resumeSpan. + p.curRawKey = nil + p.curKey = nil + p.curTS = hlc.Timestamp{} + } +} + +// get iterates exactly once and adds one KV to the result set. +func (p *pebbleMVCCScanner) get() { + p.seek(p.start) + p.getAndAdvance() +} + +// Increments itersBeforeSeek while ensuring it stays <= maxItersBeforeSeek +func (p *pebbleMVCCScanner) incrementItersBeforeSeek() { + p.itersBeforeSeek++ + if p.itersBeforeSeek > maxItersBeforeSeek { + p.itersBeforeSeek = maxItersBeforeSeek + } +} + +// Decrements itersBeforeSeek while ensuring it stays positive. +func (p *pebbleMVCCScanner) decrementItersBeforeSeek() { + p.itersBeforeSeek-- + if p.itersBeforeSeek < 1 { + p.itersBeforeSeek = 1 + } +} + +// Updates cur{RawKey, Key, TS} to match record the iterator is pointing to. +func (p *pebbleMVCCScanner) updateCurrent() { + if !p.parent.Valid() { + return + } + + p.curRawKey = append(p.curRawKey[:0], p.parent.Key()...) + p.curValue = append(p.curValue[:0], p.parent.Value()...) + + mvccKey, err := DecodeMVCCKey(p.curRawKey) + if err != nil { + p.err = err + return + } + + p.curKey = mvccKey.Key + p.curTS = mvccKey.Timestamp +} + +// Advance to the next key in the iterator's direction. +func (p *pebbleMVCCScanner) advanceKey() { + if p.reverse { + p.prevKey() + } else { + p.nextKey() + } +} + +// Advance to the newest iteration of the previous user key (where user key = +// part of the MVCC Key that precedes the timestamp). +func (p *pebbleMVCCScanner) prevKey() { + iterCount := p.itersBeforeSeek + mvccKey, err := DecodeMVCCKey(p.parent.Key()) + gotToPrevious := false + + for iterCount >= 0 && p.parent.Valid() && err == nil && bytes.Compare(mvccKey.Key , p.curKey) == 0 { + p.parent.Prev() + mvccKey, err = DecodeMVCCKey(p.parent.Key()) + iterCount-- + + if err == nil && bytes.Compare(mvccKey.Key, p.curKey) != 0 && !gotToPrevious { + // We've backed up to the previous key, but not its latest revision. + // Update current then keep going until we get to the latest version of + // that key. + gotToPrevious = true + p.updateCurrent() + } + } + + if err != nil { + p.err = err + return + } + + if bytes.Compare(mvccKey.Key, p.curKey) == 0 { + // We have to seek. + if !gotToPrevious { + // Seek to the latest revision of the key before p.curKey. + p.seekReverse(p.curKey) + } else { + // p.curKey is already one key before where it was at the start of this + // function. Just seek to the latest revision of it. + p.seek(p.curKey) + } + + p.decrementItersBeforeSeek() + return + } else if gotToPrevious { + // We made it to the record preceding the record we're finding, since we + // encountered two changes in mvccKey.Key. Go to the next key to get to the + // correct record. + p.parent.Next() + } + + p.incrementItersBeforeSeek() + p.updateCurrent() +} + +// Advances to the next user key. +func (p *pebbleMVCCScanner) nextKey() { + if !p.parent.Valid() { + return + } + + iterCount := p.itersBeforeSeek + mvccKey, err := DecodeMVCCKey(p.parent.Key()) + + for iterCount >= 0 && p.parent.Valid() && err == nil && bytes.Compare(mvccKey.Key , p.curKey) == 0 { + p.parent.Next() + mvccKey, err = DecodeMVCCKey(p.parent.Key()) + } + + if err != nil { + p.err = err + return + } + + if bytes.Compare(mvccKey.Key, p.curKey) == 0 { + // We have to seek. Append a null byte to the current key. Note that + // appending to p.curKey could invalidate p.curRawKey, since p.curKey is + // usually a sub-slice of the latter. But if we're just seeking to the next + // key right afterward (seek calls updateCurrent), this is not an issue. + p.curKey = append(p.curKey, 0x00) + p.seek(p.curKey) + + p.decrementItersBeforeSeek() + return + } + + p.incrementItersBeforeSeek() + p.updateCurrent() +} + +// Seeks to the latest revision of the current key that's still less than or +// equal to the specified timestamp, adds it to the result set, then moves onto +// the next user key. +func (p *pebbleMVCCScanner) seekVersion(ts hlc.Timestamp, uncertaintyCheck bool) { + mvccKey := MVCCKey{Key: p.curKey, Timestamp: ts} + p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], mvccKey) + origKey := p.keyBuf[:len(p.curKey)] + + iterCount := p.itersBeforeSeek + for iterCount >= 0 && MVCCKeyCompare(p.curRawKey, p.keyBuf) < 0 { + if !p.parent.Valid() { + // For reverse iterations, go back a key. + if p.reverse { + p.prevKey() + } + return + } + p.parent.Next() + p.updateCurrent() + iterCount-- + } + + if iterCount < 0 { + p.parent.SeekGE(p.keyBuf) + p.updateCurrent() + + p.decrementItersBeforeSeek() + return + } else { + p.incrementItersBeforeSeek() + } + + if !p.parent.Valid() { + // For reverse iterations, go back a key. + if p.reverse { + p.prevKey() + } + return + } + if bytes.Compare(p.curKey, origKey) != 0 { + // Could not find a value - we moved past to the next key. + if p.reverse { + p.prevKey() + } + return + } + + if !(p.curTS.Less(ts) || p.curTS.Equal(ts)) { + if ts == (hlc.Timestamp{}) { + // Zero timestamps come at the start. This case means there's + // no value at the zero timestamp, and we're sitting at a nonzero + // timestamp for the same key. Skip to the next key. + p.advanceKey() + return + } + // Potential ordering issue - this should never happen. + panic("timestamps encountered out of order") + } + + if uncertaintyCheck && p.ts.Less(p.curTS) { + p.err = p.uncertaintyError(p.curTS) + return + } + + // Check to ensure we don't unintentionally add an intent to results. + if p.curTS != (hlc.Timestamp{}) { + p.addKV(p.curRawKey, p.curValue) + } + p.advanceKey() +} + +// Adds the specified value to the result set, excluding tombstones unless +// p.tombstones is true. +func (p *pebbleMVCCScanner) addKV(key []byte, val []byte) { + if len(val) > 0 || p.tombstones { + p.results.put(key, val) + } +} + +// Returns an uncertainty error with the specified timestamp and p.txn. +func (p *pebbleMVCCScanner) uncertaintyError(ts hlc.Timestamp) error { + if ts.WallTime == 0 && ts.Logical == 0 { + return nil + } + + return roachpb.NewReadWithinUncertaintyIntervalError( + p.ts, ts, p.txn) +} + +// Try to read from the current value's intent history. Assumes p.meta has been +// unmarshalled already. Returns true if a value was read and added to the +// result set. +func (p *pebbleMVCCScanner) getFromIntentHistory() bool { + intentHistory := p.meta.IntentHistory + // upIdx is the index of the first intent in intentHistory with a sequence + // number greater than our transaction's sequence number. Subtract 1 from it + // to get the index of the intent with the highest sequence number that is + // still less than or equal to p.txnSeq. + upIdx := sort.Search(len(intentHistory), func(i int) bool { + return intentHistory[i].Sequence > p.txn.Sequence + }) + if upIdx == 0 { + // It is possible that no intent exists such that the sequence is less + // than the read sequence. In this case, we cannot read a value from the + // intent history. + return false + } + intent := p.meta.IntentHistory[upIdx - 1] + p.addKV(p.curRawKey, intent.Value) + return true +} + +// Emit a tuple (using p.addKV) and return true if we have reason to believe +// iteration can continue. +func (p *pebbleMVCCScanner) getAndAdvance() bool { + if !p.parent.Valid() { + return false + } + p.err = nil + + mvccKey := MVCCKey{p.curKey, p.curTS} + if mvccKey.IsValue() { + if !p.ts.Less(p.curTS) { + // 1. Fast path: there is no intent and our read timestamp is newer than + // the most recent version's timestamp. + p.addKV(p.curRawKey, p.curValue) + p.advanceKey() + return true + } + + if p.checkUncertainty { + // 2. Our txn's read timestamp is less than the max timestamp + // seen by the txn. We need to check for clock uncertainty + // errors. + if !p.txn.MaxTimestamp.Less(p.curTS) { + p.err = p.uncertaintyError(p.curTS) + return false + } + + p.seekVersion(p.txn.MaxTimestamp, true) + if p.err != nil { + return false + } + return true + } + + // 3. Our txn's read timestamp is greater than or equal to the + // max timestamp seen by the txn so clock uncertainty checks are + // unnecessary. We need to seek to the desired version of the + // value (i.e. one with a timestamp earlier than our read + // timestamp). + p.seekVersion(p.ts, false) + if p.err != nil { + return false + } + return true + } + + if len(p.curValue) == 0 { + p.err = errors.Errorf("zero-length mvcc metadata") + return false + } + p.meta.Reset() + err := p.meta.Unmarshal(p.curValue) + if err != nil { + p.err = errors.Errorf("unable to decode MVCCMetadata: %s", err) + return false + } + if len(p.meta.RawBytes) != 0 { + // 4. Emit immediately if the value is inline. + p.addKV(p.curRawKey, p.meta.RawBytes) + p.advanceKey() + return true + } + + if p.meta.Txn == nil { + p.err = errors.Errorf("intent without transaction") + return false + } + metaTS := hlc.Timestamp(p.meta.Timestamp) + + // metaTS is the timestamp of an intent value, which we may or may + // not end up ignoring, depending on factors codified below. If we do ignore + // the intent then we want to read at a lower timestamp that's strictly + // below the intent timestamp (to skip the intent), but also does not exceed + // our read timestamp (to avoid erroneously picking up future committed + // values); this timestamp is prevTS. + prevTS := p.ts + if !p.ts.Less(metaTS) { + prevTS = metaTS.Prev() + } + + ownIntent := p.txn != nil && p.meta.Txn.ID.Equal(p.txn.ID) + maxVisibleTS := p.ts + if p.checkUncertainty { + maxVisibleTS = p.txn.MaxTimestamp + } + + if maxVisibleTS.Less(metaTS) && !ownIntent { + // 5. The key contains an intent, but we're reading before the + // intent. Seek to the desired version. Note that if we own the + // intent (i.e. we're reading transactionally) we want to read + // the intent regardless of our read timestamp and fall into + // case 8 below. + p.seekVersion(p.ts, false) + if p.err != nil { + return false + } + return true + } + + if p.inconsistent { + // 6. The key contains an intent and we're doing an inconsistent + // read at a timestamp newer than the intent. We ignore the + // intent by insisting that the timestamp we're reading at is a + // historical timestamp < the intent timestamp. However, we + // return the intent separately; the caller may want to resolve + // it. + if p.results.count == p.maxKeys { + // We've already retrieved the desired number of keys and now + // we're adding the resume key. We don't want to add the + // intent here as the intents should only correspond to KVs + // that lie before the resume key. + return false + } + p.err = p.intents.Set(p.curRawKey, p.curValue, nil) + if p.err != nil { + return false + } + + p.seekVersion(prevTS, false) + if p.err != nil { + return false + } + return true + } + + if !ownIntent { + // 7. The key contains an intent which was not written by our + // transaction and our read timestamp is newer than that of the + // intent. Note that this will trigger an error on the Go + // side. We continue scanning so that we can return all of the + // intents in the scan range. + p.err = p.intents.Set(p.curRawKey, p.curValue, nil) + if p.err != nil { + return false + } + p.advanceKey() + if p.err != nil { + return false + } + return true + } + + if p.txn != nil && p.txn.Epoch == p.meta.Txn.Epoch { + if p.ignoreSeq || (p.txn.Sequence >= p.meta.Txn.Sequence) { + // 8. We're reading our own txn's intent at an equal or higher sequence. + // Note that we read at the intent timestamp, not at our read timestamp + // as the intent timestamp may have been pushed forward by another + // transaction. Txn's always need to read their own writes. + p.seekVersion(metaTS, false) + } else { + // 9. We're reading our own txn's intent at a lower sequence than is + // currently present in the intent. This means the intent we're seeing + // was written at a higher sequence than the read and that there may or + // may not be earlier versions of the intent (with lower sequence + // numbers) that we should read. If there exists a value in the intent + // history that has a sequence number equal to or less than the read + // sequence, read that value. + found := p.getFromIntentHistory(); + if found { + p.advanceKey() + return true + } + // 10. If no value in the intent history has a sequence number equal to + // or less than the read, we must ignore the intents laid down by the + // transaction all together. We ignore the intent by insisting that the + // timestamp we're reading at is a historical timestamp < the intent + // timestamp. + p.seekVersion(prevTS, false) + } + if p.err != nil { + return false + } + return true + } + + if p.txn != nil && (p.txn.Epoch < p.meta.Txn.Epoch) { + // 11. We're reading our own txn's intent but the current txn has + // an earlier epoch than the intent. Return an error so that the + // earlier incarnation of our transaction aborts (presumably + // this is some operation that was retried). + p.err = errors.Errorf("failed to read with epoch %d due to a write intent with epoch %d", + p.txn.Epoch, p.meta.Txn.Epoch) + } + + // 12. We're reading our own txn's intent but the current txn has a + // later epoch than the intent. This can happen if the txn was + // restarted and an earlier iteration wrote the value we're now + // reading. In this case, we ignore the intent and read the + // previous value as if the transaction were starting fresh. + p.seekVersion(prevTS, false) + if p.err != nil { + return false + } + return true +} diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index fada72a78070..bd1b9d46eaa2 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -33,14 +33,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/logtags" - humanize "github.com/dustin/go-humanize" - "github.com/elastic/gosigar" "github.com/pkg/errors" ) @@ -892,83 +889,7 @@ func (r *RocksDB) Iterate(start, end MVCCKey, f func(MVCCKeyValue) (bool, error) // Capacity queries the underlying file system for disk capacity information. func (r *RocksDB) Capacity() (roachpb.StoreCapacity, error) { - fileSystemUsage := gosigar.FileSystemUsage{} - dir := r.cfg.Dir - if dir == "" { - // This is an in-memory instance. Pretend we're empty since we - // don't know better and only use this for testing. Using any - // part of the actual file system here can throw off allocator - // rebalancing in a hard-to-trace manner. See #7050. - return roachpb.StoreCapacity{ - Capacity: r.cfg.MaxSizeBytes, - Available: r.cfg.MaxSizeBytes, - }, nil - } - if err := fileSystemUsage.Get(dir); err != nil { - return roachpb.StoreCapacity{}, err - } - - if fileSystemUsage.Total > math.MaxInt64 { - return roachpb.StoreCapacity{}, fmt.Errorf("unsupported disk size %s, max supported size is %s", - humanize.IBytes(fileSystemUsage.Total), humanizeutil.IBytes(math.MaxInt64)) - } - if fileSystemUsage.Avail > math.MaxInt64 { - return roachpb.StoreCapacity{}, fmt.Errorf("unsupported disk size %s, max supported size is %s", - humanize.IBytes(fileSystemUsage.Avail), humanizeutil.IBytes(math.MaxInt64)) - } - fsuTotal := int64(fileSystemUsage.Total) - fsuAvail := int64(fileSystemUsage.Avail) - - // Find the total size of all the files in the r.dir and all its - // subdirectories. - var totalUsedBytes int64 - if errOuter := filepath.Walk(r.cfg.Dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - // This can happen if rocksdb removes files out from under us - just keep - // going to get the best estimate we can. - if os.IsNotExist(err) { - return nil - } - // Special-case: if the store-dir is configured using the root of some fs, - // e.g. "/mnt/db", we might have special fs-created files like lost+found - // that we can't read, so just ignore them rather than crashing. - if os.IsPermission(err) && filepath.Base(path) == "lost+found" { - return nil - } - return err - } - if info.Mode().IsRegular() { - totalUsedBytes += info.Size() - } - return nil - }); errOuter != nil { - return roachpb.StoreCapacity{}, errOuter - } - - // If no size limitation have been placed on the store size or if the - // limitation is greater than what's available, just return the actual - // totals. - if r.cfg.MaxSizeBytes == 0 || r.cfg.MaxSizeBytes >= fsuTotal || r.cfg.Dir == "" { - return roachpb.StoreCapacity{ - Capacity: fsuTotal, - Available: fsuAvail, - Used: totalUsedBytes, - }, nil - } - - available := r.cfg.MaxSizeBytes - totalUsedBytes - if available > fsuAvail { - available = fsuAvail - } - if available < 0 { - available = 0 - } - - return roachpb.StoreCapacity{ - Capacity: r.cfg.MaxSizeBytes, - Available: available, - Used: totalUsedBytes, - }, nil + return computeCapacity(r.cfg.Dir, r.cfg.MaxSizeBytes) } // Compact forces compaction over the entire database. @@ -3378,6 +3299,8 @@ func notFoundErrOrDefault(err error) error { // DBFile is an interface for interacting with DBWritableFile in RocksDB. type DBFile interface { // Append appends data to this DBFile. + // + // TODO(itsbilal): Rename this to Write([]byte) to adhere to io.Writer. Append(data []byte) error // Close closes this DBFile. Close() error diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index 858f213d0843..7c0371142315 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -370,6 +370,10 @@ func (r *Replica) handleNoRaftLogDeltaResult(ctx context.Context) { func (r *Replica) handleSuggestedCompactionsResult( ctx context.Context, scs []storagepb.SuggestedCompaction, ) { + // TODO(itsbilal): Remove this check once Pebble supports GetSSTables + if r.store.compactor == nil { + return + } for _, sc := range scs { r.store.compactor.Suggest(ctx, sc) } diff --git a/pkg/storage/replica_destroy.go b/pkg/storage/replica_destroy.go index e758cb9d5d4f..ba0800435d64 100644 --- a/pkg/storage/replica_destroy.go +++ b/pkg/storage/replica_destroy.go @@ -98,7 +98,10 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS // // TODO(benesch): we would ideally atomically suggest the compaction with // the deletion of the data itself. - if ms != (enginepb.MVCCStats{}) { + // + // TODO(itsbilal): Remove the compactor != nil check once Pebble supports + // GetSSTables. + if ms != (enginepb.MVCCStats{}) && r.store.compactor != nil { desc := r.Desc() r.store.compactor.Suggest(ctx, storagepb.SuggestedCompaction{ StartKey: roachpb.Key(desc.StartKey), diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 67d86490321c..d1f624206acc 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -816,17 +816,20 @@ func NewStore( s.txnWaitMetrics = txnwait.NewMetrics(cfg.HistogramWindowInterval) s.metrics.registry.AddMetricStruct(s.txnWaitMetrics) - s.compactor = compactor.NewCompactor( - s.cfg.Settings, - s.engine.(engine.WithSSTables), - func() (roachpb.StoreCapacity, error) { - return s.Capacity(false /* useCached */) - }, - func(ctx context.Context) { - s.asyncGossipStore(ctx, "compactor-initiated rocksdb compaction", false /* useCached */) - }, - ) - s.metrics.registry.AddMetricStruct(s.compactor.Metrics) + // TODO(itsbilal): Remove this check once Pebble supports GetSSTables. + if engine, ok := s.engine.(engine.WithSSTables); ok { + s.compactor = compactor.NewCompactor( + s.cfg.Settings, + engine, + func() (roachpb.StoreCapacity, error) { + return s.Capacity(false /* useCached */) + }, + func(ctx context.Context) { + s.asyncGossipStore(ctx, "compactor-initiated rocksdb compaction", false /* useCached */) + }, + ) + s.metrics.registry.AddMetricStruct(s.compactor.Metrics) + } s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit) @@ -1441,7 +1444,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { } // Start the storage engine compactor. - if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { + if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) && s.compactor != nil { s.compactor.Start(s.AnnotateCtx(context.Background()), s.stopper) }