Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch: add RangeKey{Set,Unset,Delete} methods #1386

Merged
merged 1 commit into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 188 additions & 21 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/private"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/internal/rawalloc"
)

Expand Down Expand Up @@ -72,10 +73,11 @@ func (d DeferredBatchOp) Finish() {
}
}

// A Batch is a sequence of Sets, Merges, Deletes, and/or DeleteRanges that are
// applied atomically. Batch implements the Reader interface, but only an
// indexed batch supports reading (without error) via Get or NewIter. A
// non-indexed batch will return ErrNotIndexed when read from .
// A Batch is a sequence of Sets, Merges, Deletes, DeleteRanges, RangeKeySets,
// RangeKeyUnsets, and/or RangeKeyDeletes that are applied atomically. Batch
// implements the Reader interface, but only an indexed batch supports reading
// (without error) via Get or NewIter. A non-indexed batch will return
// ErrNotIndexed when read from .
//
// Indexing
//
Expand All @@ -86,8 +88,8 @@ func (d DeferredBatchOp) Finish() {
// in the LSM where every entry in the batch is considered newer than any entry
// in the underlying database (batch entries have the InternalKeySeqNumBatch
// bit set). By treating the batch as an additional layer in the LSM, iteration
// supports all batch operations (i.e. Set, Merge, Delete, and DeleteRange)
// with minimal effort.
// supports all batch operations (i.e. Set, Merge, Delete, DeleteRange,
// RangeKeySet, RangeKeyUnset, RangeKeyDelete) with minimal effort.
//
// The same key can be operated on multiple times in a batch, though only the
// latest operation will be visible. For example, Put("a", "b"), Delete("a")
Expand Down Expand Up @@ -155,14 +157,21 @@ func (d DeferredBatchOp) Finish() {
// exactly those specified by InternalKeyKind. The following table shows the
// format for records of each kind:
//
// InternalKeyKindDelete varstring
// InternalKeyKindLogData varstring
// InternalKeyKindSet varstring varstring
// InternalKeyKindMerge varstring varstring
// InternalKeyKindRangeDelete varstring varstring
// InternalKeyKindDelete varstring
// InternalKeyKindLogData varstring
// InternalKeyKindSet varstring varstring
// InternalKeyKindMerge varstring varstring
// InternalKeyKindRangeDelete varstring varstring
// InternalKeyKindRangeKeySet varstring varstring
// InternalKeyKindRangeKeyUnset varstring varstring
// InternalKeyKindRangeKeyDelete varstring varstring
//
// The intuitive understanding here are that the arguments to Delete(), Set(),
// Merge(), and DeleteRange() are encoded into the batch.
// The intuitive understanding here are that the arguments to Delete, Set,
// Merge, DeleteRange and RangeKeyDelete are encoded into the batch. The
// RangeKeySet and RangeKeyUnset operations are slightly more complicated,
// encoding their end key, suffix and value [in the case of RangeKeySet] within
// the Value varstring. For more information on the value encoding for
// RangeKeySet and RangeKeyUnset, see the internal/rangekey package.
//
// The internal batch representation is the on disk format for a batch in the
// WAL, and thus stable. New record kinds may be added, but the existing ones
Expand Down Expand Up @@ -208,19 +217,32 @@ type Batch struct {
// deletion is added.
countRangeDels uint64

// The count of range key sets, unsets and deletes in the batch. Updated
// every time a RANGEKEYSET, RANGEKEYUNSET or RANGEKEYDEL key is added.
// TODO(jackson): This likely won't be necessary long-term, but it's useful
// for the in-memory only implementation in which these keys require special
// handling.
countRangeKeys uint64

// A deferredOp struct, stored in the Batch so that a pointer can be returned
// from the *Deferred() methods rather than a value.
deferredOp DeferredBatchOp

// An optional skiplist keyed by offset into data of the entry.
index *batchskl.Skiplist
rangeDelIndex *batchskl.Skiplist
rangeKeyIndex *batchskl.Skiplist

// Fragmented range deletion tombstones. Cached the first time a range
// deletion iterator is requested. The cache is invalidated whenever a new
// range deletion is added to the batch.
tombstones []keyspan.Span

// Fragmented range key spans. Cached the first time a range key iterator is
// requested. The cache is invalidated whenever a new range key
// (RangeKey{Set,Unset,Del}) is added to the batch.
rangeKeys []keyspan.Span

// The flushableBatch wrapper if the batch is too large to fit in the
// memtable.
flushable *flushableBatch
Expand Down Expand Up @@ -290,7 +312,7 @@ func (b *Batch) release() {
if b.index == nil {
batchPool.Put(b)
} else {
b.index, b.rangeDelIndex = nil, nil
b.index, b.rangeDelIndex, b.rangeKeyIndex = nil, nil, nil
indexedBatchPool.Put((*indexedBatch)(unsafe.Pointer(b)))
}
}
Expand All @@ -302,14 +324,18 @@ func (b *Batch) refreshMemTableSize() {
}

b.countRangeDels = 0
b.countRangeKeys = 0
for r := b.Reader(); ; {
kind, key, value, ok := r.Next()
if !ok {
break
}
b.memTableSize += memTableEntrySize(len(key), len(value))
if kind == InternalKeyKindRangeDelete {
switch kind {
case InternalKeyKindRangeDelete:
b.countRangeDels++
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.countRangeKeys++
}
}
}
Expand Down Expand Up @@ -343,18 +369,28 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
if !ok {
break
}
if kind == InternalKeyKindRangeDelete {
switch kind {
case InternalKeyKindRangeDelete:
b.countRangeDels++
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.countRangeKeys++
}
if b.index != nil {
var err error
if kind == InternalKeyKindRangeDelete {
switch kind {
case InternalKeyKindRangeDelete:
b.tombstones = nil
if b.rangeDelIndex == nil {
b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
}
err = b.rangeDelIndex.Add(uint32(offset))
} else {
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.rangeKeys = nil
if b.rangeKeyIndex == nil {
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
}
err = b.rangeKeyIndex.Add(uint32(offset))
default:
err = b.index.Add(uint32(offset))
}
if err != nil {
Expand Down Expand Up @@ -578,8 +614,9 @@ func (b *Batch) SingleDeleteDeferred(keyLen int) *DeferredBatchOp {
return &b.deferredOp
}

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
// DeleteRange deletes all of the point keys (and values) in the range
// [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT
// delete overlapping range keys (eg, keys set via RangeKeySet).
//
// It is safe to modify the contents of the arguments after DeleteRange
// returns.
Expand Down Expand Up @@ -618,6 +655,136 @@ func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
return &b.deferredOp
}

// Experimental returns the same batch, with experimental portions of the API
// exposed.
func (b *Batch) Experimental() ExperimentalBatch {
return ExperimentalBatch{b}
}

// ExperimentalBatch provides access to experimental features of a Batch.
type ExperimentalBatch struct {
*Batch
}

// RangeKeySet sets a range key mapping the key range [start, end) at the MVCC
// timestamp suffix to value. The suffix is optional. If any portion of the key
// range [start, end) is already set by a range key with the same suffix value,
// RangeKeySet overrides it.
//
// It is safe to modify the contents of the arguments after RangeKeySet returns.
//
// WARNING: This is an experimental feature with limited functionality.
func (b ExperimentalBatch) RangeKeySet(start, end, suffix, value []byte, _ *WriteOptions) error {
suffixValues := [1]rangekey.SuffixValue{{Suffix: suffix, Value: value}}
internalValueLen := rangekey.EncodedSetValueLen(end, suffixValues[:])

deferredOp := b.rangeKeySetDeferred(len(start), internalValueLen)
copy(deferredOp.Key, start)
n := rangekey.EncodeSetValue(deferredOp.Value, end, suffixValues[:])
if n != internalValueLen {
panic("unexpected internal value length mismatch")
}

// Manually inline DeferredBatchOp.Finish().
if deferredOp.index != nil {
if err := deferredOp.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
}
}
return nil
}

func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeySet)
b.incrementRangeKeysCount()
return &b.deferredOp
}

func (b *Batch) incrementRangeKeysCount() {
b.countRangeKeys++
if b.index != nil {
b.rangeKeys = nil
// Range keys are rare, so we lazily allocate the index for them.
if b.rangeKeyIndex == nil {
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
}
b.deferredOp.index = b.rangeKeyIndex
}
}

// RangeKeyUnset removes a range key mapping the key range [start, end) at the
// MVCC timestamp suffix. The suffix may be omitted to remove an unsuffixed
// range key. RangeKeyUnset only removes portions of range keys that fall within
// the [start, end) key span, and only range keys with suffixes that exactly
// match the unset suffix.
//
// It is safe to modify the contents of the arguments after RangeKeyUnset
// returns.
//
// WARNING: This is an experimental feature with limited functionality.
func (b ExperimentalBatch) RangeKeyUnset(start, end, suffix []byte, _ *WriteOptions) error {
suffixes := [1][]byte{suffix}
internalValueLen := rangekey.EncodedUnsetValueLen(end, suffixes[:])

deferredOp := b.rangeKeyUnsetDeferred(len(start), internalValueLen)
copy(deferredOp.Key, start)
n := rangekey.EncodeUnsetValue(deferredOp.Value, end, suffixes[:])
if n != internalValueLen {
panic("unexpected internal value length mismatch")
}

// Manually inline DeferredBatchOp.Finish()
if deferredOp.index != nil {
if err := deferredOp.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
}
}
return nil
}

func (b *Batch) rangeKeyUnsetDeferred(startLen, internalValueLen int) *DeferredBatchOp {
b.prepareDeferredKeyValueRecord(startLen, internalValueLen, InternalKeyKindRangeKeyUnset)
b.incrementRangeKeysCount()
return &b.deferredOp
}

// RangeKeyDelete deletes all of the range keys in the range [start,end)
// (inclusive on start, exclusive on end). It does not delete point keys (for
// that use DeleteRange). RangeKeyDelete removes all range keys within the
// bounds, including those with or without suffixes.
//
// It is safe to modify the contents of the arguments after RangeKeyDelete
// returns.
//
// WARNING: This is an experimental feature with limited functionality.
func (b ExperimentalBatch) RangeKeyDelete(start, end []byte, _ *WriteOptions) error {
deferredOp := b.RangeKeyDeleteDeferred(len(start), len(end))
copy(deferredOp.Key, start)
copy(deferredOp.Value, end)
// Manually inline DeferredBatchOp.Finish().
if deferredOp.index != nil {
if err := deferredOp.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
}
}
return nil
}

// RangeKeyDeleteDeferred is similar to RangeKeyDelete in that it adds an
// operation to delete range keys to the batch, except it only takes in key
// lengths instead of complete slices, letting the caller encode into those
// objects and then call Finish() on the returned object. Note that
// DeferredBatchOp.Key should be populated with the start key, and
// DeferredBatchOp.Value should be populated with the end key.
func (b ExperimentalBatch) RangeKeyDeleteDeferred(startLen, endLen int) *DeferredBatchOp {
b.prepareDeferredKeyValueRecord(startLen, endLen, InternalKeyKindRangeKeyDelete)
b.incrementRangeKeysCount()
return &b.deferredOp
}

// LogData adds the specified to the batch. The data will be written to the
// WAL, but not added to memtables or sstables. Log data is never indexed,
// which makes it useful for testing WAL performance.
Expand Down Expand Up @@ -913,7 +1080,7 @@ func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, o
return 0, nil, nil, false
}
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete:
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete, InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
*r, value, ok = batchDecodeStr(*r)
if !ok {
return 0, nil, nil, false
Expand Down
Loading