Skip to content

Commit

Permalink
storage: Update Engine/Reader/Writer interfaces for ScanInternal
Browse files Browse the repository at this point in the history
This change updates pkg/storage interfaces and implementations to allow
the use of ScanInternal in skip-shared iteration mode as well as
writing/reading of internal point keys, range dels and range keys.
Replication / snapshot code will soon rely on these changes to
be able to replicate internal keys in higher levels plus metadata
of shared sstables in lower levels, as opposed to just observed
user keys.

Part of #103028

Epic: none

Release note: None
  • Loading branch information
itsbilal committed Jul 21, 2023
1 parent e0235d0 commit 6862371
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/spanset/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//rangekey",
],
)

Expand Down
48 changes: 48 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
)

// MVCCIterator wraps an storage.MVCCIterator and ensures that it can
Expand Down Expand Up @@ -445,6 +446,17 @@ type spanSetReader struct {

var _ storage.Reader = spanSetReader{}

func (s spanSetReader) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

func (s spanSetReader) Close() {
s.r.Close()
}
Expand Down Expand Up @@ -507,6 +519,18 @@ type spanSetWriter struct {
ts hlc.Timestamp
}

func (s spanSetWriter) ClearRawEncodedRange(start, end []byte) error {
return s.w.ClearRawEncodedRange(start, end)
}

func (s spanSetWriter) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
return s.w.PutInternalRangeKey(start, end, key)
}

func (s spanSetWriter) PutInternalKey(key *pebble.InternalKey, value []byte) error {
return s.w.PutInternalKey(key, value)
}

var _ storage.Writer = spanSetWriter{}

func (s spanSetWriter) ApplyBatchRepr(repr []byte, sync bool) error {
Expand Down Expand Up @@ -762,6 +786,18 @@ type spanSetBatch struct {

var _ storage.Batch = spanSetBatch{}

func (s spanSetBatch) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
// Only used on Engine.
panic("unimplemented")
}

func (s spanSetBatch) Commit(sync bool) error {
return s.b.Commit(sync)
}
Expand Down Expand Up @@ -794,6 +830,18 @@ func (s spanSetBatch) CommitStats() storage.BatchCommitStats {
return s.b.CommitStats()
}

func (s spanSetBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
return s.b.PutInternalRangeKey(start, end, key)
}

func (s spanSetBatch) PutInternalKey(key *pebble.InternalKey, value []byte) error {
return s.b.PutInternalKey(key, value)
}

func (s spanSetBatch) ClearRawEncodedRange(start, end []byte) error {
return s.b.ClearRawEncodedRange(start, end)
}

// NewBatch returns a storage.Batch that asserts access of the underlying
// Batch against the given SpanSet. We only consider span boundaries, associated
// timestamps are not considered.
Expand Down
32 changes: 28 additions & 4 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func (r *BatchReader) Value() []byte {
}
}

// EngineEndKey returns the engine end key of the current ranged batch entry.
func (r *BatchReader) EngineEndKey() (EngineKey, error) {
// EndKey returns the raw end key of the current ranged batch entry.
func (r *BatchReader) EndKey() ([]byte, error) {
var rawKey []byte
switch r.kind {
case pebble.InternalKeyKindRangeDelete:
Expand All @@ -160,14 +160,23 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) {
case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset, pebble.InternalKeyKindRangeKeyDelete:
rangeKeys, err := r.rangeKeys()
if err != nil {
return EngineKey{}, err
return nil, err
}
rawKey = rangeKeys.End

default:
return EngineKey{}, errors.AssertionFailedf(
return nil, errors.AssertionFailedf(
"can only ask for EndKey on a ranged entry, got %v", r.kind)
}
return rawKey, nil
}

// EngineEndKey returns the engine end key of the current ranged batch entry.
func (r *BatchReader) EngineEndKey() (EngineKey, error) {
rawKey, err := r.EndKey()
if err != nil {
return EngineKey{}, err
}

key, ok := DecodeEngineKey(rawKey)
if !ok {
Expand All @@ -176,6 +185,21 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) {
return key, nil
}

// RawRangeKeys returns the raw range key values at the current entry.
func (r *BatchReader) RawRangeKeys() ([]rangekey.Key, error) {
switch r.kind {
case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset:
default:
return nil, errors.AssertionFailedf(
"can only ask for range keys on a range key entry, got %v", r.kind)
}
rangeKeys, err := r.rangeKeys()
if err != nil {
return nil, err
}
return rangeKeys.Keys, nil
}

// EngineRangeKeys returns the engine range key values at the current entry.
func (r *BatchReader) EngineRangeKeys() ([]EngineRangeKeyValue, error) {
switch r.kind {
Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/redact"
prometheusgo "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -598,6 +599,17 @@ type Reader interface {
// with the iterator to free resources. The caller can change IterOptions
// after this function returns.
NewEngineIterator(opts IterOptions) EngineIterator
// ScanInternal allows a caller to inspect the underlying engine's
// InternalKeys using a visitor pattern, while also allowing for keys in
// shared files to be skipped if a visitor is provided for visitSharedFiles.
// Useful for fast-replicating state from one Reader to another.
ScanInternal(
ctx context.Context, lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error
// ConsistentIterators returns true if the Reader implementation guarantees
// that the different iterators constructed by this Reader will see the same
// underlying Engine state. This is not true about Batch writes: new iterators
Expand Down Expand Up @@ -690,6 +702,11 @@ type Writer interface {
// from the storage engine. It is safe to modify the contents of the arguments
// after it returns.
ClearRawRange(start, end roachpb.Key, pointKeys, rangeKeys bool) error
// ClearRawEncodedRange is similar to ClearRawRange, except it takes pre-encoded
// start, end keys and bypasses the EngineKey encoding step. It also only
// operates on point keys; for range keys, use ClearEngineRangeKey or
// PutInternalRangeKey.
ClearRawEncodedRange(start, end []byte) error
// ClearMVCCRange removes MVCC point and/or range keys (including intents)
// from start (inclusive) to end (exclusive) using Pebble range tombstones.
//
Expand Down Expand Up @@ -757,6 +774,17 @@ type Writer interface {
// It is safe to modify the contents of the arguments after it returns.
PutEngineRangeKey(start, end roachpb.Key, suffix, value []byte) error

// PutInternalRangeKey adds an InternalRangeKey to this batch. This is a very
// low-level method that should be used sparingly.
//
// It is safe to modify the contents of the arguments after it returns.
PutInternalRangeKey(start, end []byte, key rangekey.Key) error
// PutInternalKey adds an InternalKey to this batch. This is a very
// low-level method that should be used sparingly.
//
// It is safe to modify the contents of the arguments after it returns.
PutInternalKey(key *pebble.InternalKey, value []byte) error

// ClearEngineRangeKey clears the given range key. This is a general-purpose
// and low-level method that should be used sparingly, only when the other
// Clear* methods are not applicable.
Expand Down Expand Up @@ -984,6 +1012,11 @@ type Engine interface {
// additionally returns ingestion stats.
IngestExternalFilesWithStats(
ctx context.Context, paths []string) (pebble.IngestOperationStats, error)
// IngestAndExciseExternalFiles is a variant of IngestExternalFilesWithStats
// that excises an ExciseSpan, and ingests either local or shared sstables or
// both.
IngestAndExciseExternalFiles(
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error)
// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func BallastSize(size int64) ConfigOption {
func SharedStorage(sharedStorage cloud.ExternalStorage) ConfigOption {
return func(cfg *engineConfig) error {
cfg.SharedStorage = sharedStorage
// TODO(bilal): Do the format major version ratchet while accounting for
// version upgrade finalization. However, seeing as shared storage is
// an experimental feature and upgrading from existing stores is not
// supported, this is fine.
cfg.Opts.FormatMajorVersion = pebble.ExperimentalFormatVirtualSSTables
return nil
}
}
Expand Down
96 changes: 95 additions & 1 deletion pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,13 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
ValueBlocksEnabled.Get(&cfg.Settings.SV)
}
opts.Experimental.DisableIngestAsFlushable = func() bool {
return !IngestAsFlushable.Get(&cfg.Settings.SV)
// Disable flushable ingests if shared storage is enabled. This is because
// flushable ingests currently do not support Excise operations.
//
// TODO(bilal): Remove the first part of this || statement when
// https://github.com/cockroachdb/pebble/issues/2676 is completed, or when
// Pebble has better guards against this.
return cfg.SharedStorage != nil || !IngestAsFlushable.Get(&cfg.Settings.SV)
}

auxDir := opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir)
Expand Down Expand Up @@ -1445,6 +1451,38 @@ func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator {
return newPebbleIterator(p.db, opts, StandardDurability, p)
}

// ScanInternal implements the Engine interface.
func (p *Pebble) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
rawLower := EngineKey{Key: lower}.Encode()
rawUpper := EngineKey{Key: upper}.Encode()
return p.db.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

// ClearRawEncodedRange implements the Engine interface.
func (p *Pebble) ClearRawEncodedRange(start, end []byte) error {
// Only used with batches and SST writers.
panic("unimplemented")
}

// PutInternalRangeKey implements the Engine interface.
func (p *Pebble) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
// Only used with batches and SST writers.
panic("unimplemented")
}

// PutInternalKey implements the Engine interface.
func (p *Pebble) PutInternalKey(key *pebble.InternalKey, value []byte) error {
// Only used with batches and SST writers.
panic("unimplemented")
}

// ConsistentIterators implements the Engine interface.
func (p *Pebble) ConsistentIterators() bool {
return false
Expand Down Expand Up @@ -2003,6 +2041,17 @@ func (p *Pebble) IngestExternalFilesWithStats(
return p.db.IngestWithStats(paths)
}

// IngestAndExciseExternalFiles implements the Engine interface.
func (p *Pebble) IngestAndExciseExternalFiles(
ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span,
) (pebble.IngestOperationStats, error) {
rawSpan := pebble.KeyRange{
Start: EngineKey{Key: exciseSpan.Key}.Encode(),
End: EngineKey{Key: exciseSpan.EndKey}.Encode(),
}
return p.db.IngestAndExcise(paths, shared, rawSpan)
}

// PreIngestDelay implements the Engine interface.
func (p *Pebble) PreIngestDelay(ctx context.Context) {
preIngestDelay(ctx, p, p.settings)
Expand Down Expand Up @@ -2411,10 +2460,41 @@ func (p *pebbleReadOnly) PinEngineStateForIterators() error {
return nil
}

// ScanInternal implements the Reader interface.
func (p *pebbleReadOnly) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
return p.parent.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
// could be refactored so that a Reader could be supplied to evaluateBatch

// Writer is the write interface to an engine's data.

// ClearRawEncodedRange implements the Writer interface.
func (p *pebbleReadOnly) ClearRawEncodedRange(start, end []byte) error {
// Only used with batches and SST writers.
panic("not implemented")
}

// PutInternalRangeKey implements the Writer interface.
func (p *pebbleReadOnly) PutInternalRangeKey(start, end []byte, key rangekey.Key) error {
// Only used with batches and SST writers.
panic("not implemented")
}

// PutInternalKey implements the Writer interface.
func (p *pebbleReadOnly) PutInternalKey(key *pebble.InternalKey, value []byte) error {
// Only used with batches and SST writers.
panic("not implemented")
}

func (p *pebbleReadOnly) ApplyBatchRepr(repr []byte, sync bool) error {
panic("not implemented")
}
Expand Down Expand Up @@ -2588,6 +2668,20 @@ func (p *pebbleSnapshot) PinEngineStateForIterators() error {
return nil
}

// ScanInternal implements the Reader interface.
func (p *pebbleSnapshot) ScanInternal(
ctx context.Context,
lower, upper roachpb.Key,
visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error,
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
rawLower := EngineKey{Key: lower}.Encode()
rawUpper := EngineKey{Key: upper}.Encode()
return p.snapshot.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
}

// ExceedMaxSizeError is the error returned when an export request
// fails due the export size exceeding the budget. This can be caused
// by large KVs that have many revisions.
Expand Down
Loading

0 comments on commit 6862371

Please sign in to comment.