diff --git a/pkg/kv/kvserver/spanset/BUILD.bazel b/pkg/kv/kvserver/spanset/BUILD.bazel index 42c79003d8fa..5afeede69aef 100644 --- a/pkg/kv/kvserver/spanset/BUILD.bazel +++ b/pkg/kv/kvserver/spanset/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//rangekey", ], ) diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 2b49b71c388e..731a7ad8c5a0 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/rangekey" ) // MVCCIterator wraps an storage.MVCCIterator and ensures that it can @@ -445,6 +446,17 @@ type spanSetReader struct { var _ storage.Reader = spanSetReader{} +func (s spanSetReader) ScanInternal( + ctx context.Context, + lower, upper roachpb.Key, + visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error, + visitRangeDel func(start []byte, end []byte, seqNum uint64) error, + visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, +) error { + return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) +} + func (s spanSetReader) Close() { s.r.Close() } @@ -762,6 +774,18 @@ type spanSetBatch struct { var _ storage.Batch = spanSetBatch{} +func (s spanSetBatch) ScanInternal( + ctx context.Context, + lower, upper roachpb.Key, + visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error, + visitRangeDel func(start []byte, end []byte, seqNum uint64) error, + visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, +) error { + // Only used on Engine. + panic("unimplemented") +} + func (s spanSetBatch) Commit(sync bool) error { return s.b.Commit(sync) } @@ -794,6 +818,18 @@ func (s spanSetBatch) CommitStats() storage.BatchCommitStats { return s.b.CommitStats() } +func (s spanSetBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error { + return s.b.PutInternalRangeKey(start, end, key) +} + +func (s spanSetBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error { + return s.b.PutInternalPointKey(key, value) +} + +func (s spanSetBatch) ClearRawEncodedRange(start, end []byte) error { + return s.b.ClearRawEncodedRange(start, end) +} + // NewBatch returns a storage.Batch that asserts access of the underlying // Batch against the given SpanSet. We only consider span boundaries, associated // timestamps are not considered. diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 40a38fe6b2d4..35a8281d70ca 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -150,8 +150,8 @@ func (r *BatchReader) Value() []byte { } } -// EngineEndKey returns the engine end key of the current ranged batch entry. -func (r *BatchReader) EngineEndKey() (EngineKey, error) { +// EndKey returns the raw end key of the current ranged batch entry. +func (r *BatchReader) EndKey() ([]byte, error) { var rawKey []byte switch r.kind { case pebble.InternalKeyKindRangeDelete: @@ -160,14 +160,23 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) { case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset, pebble.InternalKeyKindRangeKeyDelete: rangeKeys, err := r.rangeKeys() if err != nil { - return EngineKey{}, err + return nil, err } rawKey = rangeKeys.End default: - return EngineKey{}, errors.AssertionFailedf( + return nil, errors.AssertionFailedf( "can only ask for EndKey on a ranged entry, got %v", r.kind) } + return rawKey, nil +} + +// EngineEndKey returns the engine end key of the current ranged batch entry. +func (r *BatchReader) EngineEndKey() (EngineKey, error) { + rawKey, err := r.EndKey() + if err != nil { + return EngineKey{}, err + } key, ok := DecodeEngineKey(rawKey) if !ok { @@ -176,6 +185,21 @@ func (r *BatchReader) EngineEndKey() (EngineKey, error) { return key, nil } +// RawRangeKeys returns the raw range key values at the current entry. +func (r *BatchReader) RawRangeKeys() ([]rangekey.Key, error) { + switch r.kind { + case pebble.InternalKeyKindRangeKeySet, pebble.InternalKeyKindRangeKeyUnset: + default: + return nil, errors.AssertionFailedf( + "can only ask for range keys on a range key entry, got %v", r.kind) + } + rangeKeys, err := r.rangeKeys() + if err != nil { + return nil, err + } + return rangeKeys.Keys, nil +} + // EngineRangeKeys returns the engine range key values at the current entry. func (r *BatchReader) EngineRangeKeys() ([]EngineRangeKeyValue, error) { switch r.kind { diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index be393d0e1355..fd0a313a1de6 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/redact" prometheusgo "github.com/prometheus/client_model/go" @@ -598,6 +599,26 @@ type Reader interface { // with the iterator to free resources. The caller can change IterOptions // after this function returns. NewEngineIterator(opts IterOptions) EngineIterator + // ScanInternal allows a caller to inspect the underlying engine's InternalKeys + // using a visitor pattern, while also allowing for keys in shared files to be + // skipped if a visitor is provided for visitSharedFiles. Useful for + // fast-replicating state from one Reader to another. Point keys are collapsed + // such that only one internal key per user key is exposed, and rangedels and + // range keys are collapsed and defragmented with each span being surfaced + // exactly once, alongside the highest seqnum for a rangedel on that span + // (for rangedels) or all coalesced rangekey.Keys in that span (for range + // keys). A point key deleted by a rangedel will not be exposed, but the + // rangedel would be exposed. + // + // Note that ScanInternal does not obey the guarantees indicated by + // ConsistentIterators. + ScanInternal( + ctx context.Context, lower, upper roachpb.Key, + visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error, + visitRangeDel func(start, end []byte, seqNum uint64) error, + visitRangeKey func(start, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, + ) error // ConsistentIterators returns true if the Reader implementation guarantees // that the different iterators constructed by this Reader will see the same // underlying Engine state. This is not true about Batch writes: new iterators @@ -854,6 +875,32 @@ type Writer interface { BufferedSize() int } +// InternalWriter is an extension of Writer that supports additional low-level +// methods to operate on internal keys in Pebble. These additional methods +// should only be used sparingly, when one of the high-level methods cannot +// achieve the same ends. +type InternalWriter interface { + Writer + // ClearRawEncodedRange is similar to ClearRawRange, except it takes pre-encoded + // start, end keys and bypasses the EngineKey encoding step. It also only + // operates on point keys; for range keys, use ClearEngineRangeKey or + // PutInternalRangeKey. + // + // It is safe to modify the contents of the arguments after it returns. + ClearRawEncodedRange(start, end []byte) error + + // PutInternalRangeKey adds an InternalRangeKey to this batch. This is a very + // low-level method that should be used sparingly. + // + // It is safe to modify the contents of the arguments after it returns. + PutInternalRangeKey(start, end []byte, key rangekey.Key) error + // PutInternalPointKey adds a point InternalKey to this batch. This is a very + // low-level method that should be used sparingly. + // + // It is safe to modify the contents of the arguments after it returns. + PutInternalPointKey(key *pebble.InternalKey, value []byte) error +} + // ClearOptions holds optional parameters to methods that clear keys from the // storage engine. type ClearOptions struct { @@ -984,6 +1031,11 @@ type Engine interface { // additionally returns ingestion stats. IngestExternalFilesWithStats( ctx context.Context, paths []string) (pebble.IngestOperationStats, error) + // IngestAndExciseExternalFiles is a variant of IngestExternalFilesWithStats + // that excises an ExciseSpan, and ingests either local or shared sstables or + // both. + IngestAndExciseExternalFiles( + ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error) // PreIngestDelay offers an engine the chance to backpressure ingestions. // When called, it may choose to block if the engine determines that it is in // or approaching a state where further ingestions may risk its health. @@ -1048,7 +1100,7 @@ type Batch interface { // WriteBatch is the interface for write batch specific operations. type WriteBatch interface { - Writer + InternalWriter // Close closes the batch, freeing up any outstanding resources. Close() // Commit atomically applies any batched updates to the underlying engine. If diff --git a/pkg/storage/open.go b/pkg/storage/open.go index e6083b69cf55..a329be6c6622 100644 --- a/pkg/storage/open.go +++ b/pkg/storage/open.go @@ -181,6 +181,11 @@ func BallastSize(size int64) ConfigOption { func SharedStorage(sharedStorage cloud.ExternalStorage) ConfigOption { return func(cfg *engineConfig) error { cfg.SharedStorage = sharedStorage + // TODO(bilal): Do the format major version ratchet while accounting for + // version upgrade finalization. However, seeing as shared storage is + // an experimental feature and upgrading from existing stores is not + // supported, this is fine. + cfg.Opts.FormatMajorVersion = pebble.ExperimentalFormatVirtualSSTables return nil } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 63705f2264d1..dffa251ce308 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1025,7 +1025,13 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { ValueBlocksEnabled.Get(&cfg.Settings.SV) } opts.Experimental.DisableIngestAsFlushable = func() bool { - return !IngestAsFlushable.Get(&cfg.Settings.SV) + // Disable flushable ingests if shared storage is enabled. This is because + // flushable ingests currently do not support Excise operations. + // + // TODO(bilal): Remove the first part of this || statement when + // https://github.com/cockroachdb/pebble/issues/2676 is completed, or when + // Pebble has better guards against this. + return cfg.SharedStorage != nil || !IngestAsFlushable.Get(&cfg.Settings.SV) } auxDir := opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir) @@ -1475,6 +1481,20 @@ func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator { return newPebbleIterator(p.db, opts, StandardDurability, p) } +// ScanInternal implements the Engine interface. +func (p *Pebble) ScanInternal( + ctx context.Context, + lower, upper roachpb.Key, + visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error, + visitRangeDel func(start []byte, end []byte, seqNum uint64) error, + visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, +) error { + rawLower := EngineKey{Key: lower}.Encode() + rawUpper := EngineKey{Key: upper}.Encode() + return p.db.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) +} + // ConsistentIterators implements the Engine interface. func (p *Pebble) ConsistentIterators() bool { return false @@ -2033,6 +2053,17 @@ func (p *Pebble) IngestExternalFilesWithStats( return p.db.IngestWithStats(paths) } +// IngestAndExciseExternalFiles implements the Engine interface. +func (p *Pebble) IngestAndExciseExternalFiles( + ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span, +) (pebble.IngestOperationStats, error) { + rawSpan := pebble.KeyRange{ + Start: EngineKey{Key: exciseSpan.Key}.Encode(), + End: EngineKey{Key: exciseSpan.EndKey}.Encode(), + } + return p.db.IngestAndExcise(paths, shared, rawSpan) +} + // PreIngestDelay implements the Engine interface. func (p *Pebble) PreIngestDelay(ctx context.Context) { preIngestDelay(ctx, p, p.settings) @@ -2444,10 +2475,23 @@ func (p *pebbleReadOnly) PinEngineStateForIterators() error { return nil } +// ScanInternal implements the Reader interface. +func (p *pebbleReadOnly) ScanInternal( + ctx context.Context, + lower, upper roachpb.Key, + visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error, + visitRangeDel func(start []byte, end []byte, seqNum uint64) error, + visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, +) error { + return p.parent.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) +} + // Writer methods are not implemented for pebbleReadOnly. Ideally, the code // could be refactored so that a Reader could be supplied to evaluateBatch // Writer is the write interface to an engine's data. + func (p *pebbleReadOnly) ApplyBatchRepr(repr []byte, sync bool) error { panic("not implemented") } @@ -2621,6 +2665,20 @@ func (p *pebbleSnapshot) PinEngineStateForIterators() error { return nil } +// ScanInternal implements the Reader interface. +func (p *pebbleSnapshot) ScanInternal( + ctx context.Context, + lower, upper roachpb.Key, + visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error, + visitRangeDel func(start []byte, end []byte, seqNum uint64) error, + visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, +) error { + rawLower := EngineKey{Key: lower}.Encode() + rawUpper := EngineKey{Key: upper}.Encode() + return p.snapshot.ScanInternal(ctx, rawLower, rawUpper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) +} + // ExceedMaxSizeError is the error returned when an export request // fails due the export size exceeding the budget. This can be caused // by large KVs that have many revisions. diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 36d45def93c1..d63ca6f2679b 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/rangekey" ) // Wrapper struct around a pebble.Batch. @@ -257,6 +258,23 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { return iter } +// ScanInternal implements the Reader interface. +func (p *pebbleBatch) ScanInternal( + ctx context.Context, + lower, upper roachpb.Key, + visitPointKey func(key *pebble.InternalKey, value pebble.LazyValue) error, + visitRangeDel func(start []byte, end []byte, seqNum uint64) error, + visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, + visitSharedFile func(sst *pebble.SharedSSTMeta) error, +) error { + panic("ScanInternal only supported on Engine and Snapshot.") +} + +// ClearRawEncodedRange implements the InternalWriter interface. +func (p *pebbleBatch) ClearRawEncodedRange(start, end []byte) error { + return p.batch.DeleteRange(start, end, pebble.Sync) +} + // ConsistentIterators implements the Batch interface. func (p *pebbleBatch) ConsistentIterators() bool { return true @@ -482,6 +500,20 @@ func (p *pebbleBatch) PutEngineRangeKey(start, end roachpb.Key, suffix, value [] EngineKey{Key: start}.Encode(), EngineKey{Key: end}.Encode(), suffix, value, nil) } +// PutInternalRangeKey implements the InternalWriter interface. +func (p *pebbleBatch) PutInternalRangeKey(start, end []byte, key rangekey.Key) error { + switch key.Kind() { + case pebble.InternalKeyKindRangeKeyUnset: + return p.batch.RangeKeyUnset(start, end, key.Suffix, nil /* writeOptions */) + case pebble.InternalKeyKindRangeKeySet: + return p.batch.RangeKeySet(start, end, key.Suffix, key.Value, nil /* writeOptions */) + case pebble.InternalKeyKindRangeKeyDelete: + return p.batch.RangeKeyDelete(start, end, nil /* writeOptions */) + default: + panic("unexpected range key kind") + } +} + // ClearEngineRangeKey implements the Engine interface. func (p *pebbleBatch) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) error { return p.batch.RangeKeyUnset( @@ -542,6 +574,14 @@ func (p *pebbleBatch) PutEngineKey(key EngineKey, value []byte) error { return p.batch.Set(p.buf, value, nil) } +// PutInternalPointKey implements the WriteBatch interface. +func (p *pebbleBatch) PutInternalPointKey(key *pebble.InternalKey, value []byte) error { + if len(key.UserKey) == 0 { + return emptyKeyError() + } + return p.batch.AddInternalKey(key, value, nil /* writeOptions */) +} + func (p *pebbleBatch) put(key MVCCKey, value []byte) error { if len(key.Key) == 0 { return emptyKeyError() diff --git a/pkg/storage/sst_writer.go b/pkg/storage/sst_writer.go index 2def95a54895..01538caeafa5 100644 --- a/pkg/storage/sst_writer.go +++ b/pkg/storage/sst_writer.go @@ -20,7 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/sstable" ) @@ -37,6 +39,7 @@ type SSTWriter struct { var _ Writer = &SSTWriter{} var _ ExportWriter = &SSTWriter{} +var _ InternalWriter = &SSTWriter{} // noopFinishAbort is used to wrap io.Writers for sstable.Writer. type noopFinishAbort struct { @@ -228,6 +231,56 @@ func (fw *SSTWriter) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) return fw.fw.RangeKeyUnset(EngineKey{Key: start}.Encode(), EngineKey{Key: end}.Encode(), suffix) } +// ClearRawEncodedRange implements the InternalWriter interface. +func (fw *SSTWriter) ClearRawEncodedRange(start, end []byte) error { + startEngine, ok := DecodeEngineKey(start) + if !ok { + return errors.New("cannot decode start engine key") + } + endEngine, ok := DecodeEngineKey(end) + if !ok { + return errors.New("cannot decode end engine key") + } + fw.DataSize += int64(len(startEngine.Key)) + int64(len(endEngine.Key)) + return fw.fw.DeleteRange(start, end) +} + +// PutInternalRangeKey implements the InternalWriter interface. +func (fw *SSTWriter) PutInternalRangeKey(start, end []byte, key rangekey.Key) error { + if !fw.supportsRangeKeys { + return errors.New("range keys not supported by SST writer") + } + startEngine, ok := DecodeEngineKey(start) + if !ok { + return errors.New("cannot decode engine key") + } + endEngine, ok := DecodeEngineKey(end) + if !ok { + return errors.New("cannot decode engine key") + } + fw.DataSize += int64(len(startEngine.Key)) + int64(len(endEngine.Key)) + int64(len(key.Value)) + switch key.Kind() { + case pebble.InternalKeyKindRangeKeyUnset: + return fw.fw.RangeKeyUnset(start, end, key.Suffix) + case pebble.InternalKeyKindRangeKeySet: + return fw.fw.RangeKeySet(start, end, key.Suffix, key.Value) + case pebble.InternalKeyKindRangeKeyDelete: + return fw.fw.RangeKeyDelete(start, end) + default: + panic("unexpected range key kind") + } +} + +// PutInternalPointKey implements the InternalWriter interface. +func (fw *SSTWriter) PutInternalPointKey(key *pebble.InternalKey, value []byte) error { + ek, ok := DecodeEngineKey(key.UserKey) + if !ok { + return errors.New("cannot decode engine key") + } + fw.DataSize += int64(len(ek.Key)) + int64(len(value)) + return fw.fw.Add(*key, value) +} + // clearRange clears all point keys in the given range by dropping a Pebble // range tombstone. //