Skip to content

Commit

Permalink
sstable: add block level suffix replacement support
Browse files Browse the repository at this point in the history
  • Loading branch information
msbutler committed Jan 23, 2024
1 parent cd23876 commit a63110b
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 36 deletions.
120 changes: 115 additions & 5 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,28 +412,37 @@ type blockIter struct {
}
hideObsoletePoints bool
SyntheticPrefix
syntheticSuffix *SyntheticSuffix
}

// blockIter implements the base.InternalIterator interface.
var _ base.InternalIterator = (*blockIter)(nil)

func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) {
func newBlockIter(
cmp Compare, block block, syntheticPrefix SyntheticPrefix, syntheticSuffix *SyntheticSuffix,
) (*blockIter, error) {
i := &blockIter{}
return i, i.init(cmp, block, 0, false, syntheticPrefix)
return i, i.init(cmp, block, 0, false, syntheticPrefix, syntheticSuffix)
}

func (i *blockIter) String() string {
return "block"
}

func (i *blockIter) init(
cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
cmp Compare,
block block,
globalSeqNum uint64,
hideObsoletePoints bool,
syntheticPrefix SyntheticPrefix,
syntheticSuffix *SyntheticSuffix,
) error {
numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:]))
if numRestarts == 0 {
return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)")
}
i.SyntheticPrefix = syntheticPrefix
i.syntheticSuffix = syntheticSuffix
i.cmp = cmp
i.restarts = int32(len(block)) - 4*(1+numRestarts)
i.numRestarts = numRestarts
Expand Down Expand Up @@ -464,11 +473,16 @@ func (i *blockIter) init(
// ingested.
// - Foreign sstable iteration: globalSeqNum is always set.
func (i *blockIter) initHandle(
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
cmp Compare,
block bufferHandle,
globalSeqNum uint64,
hideObsoletePoints bool,
syntheticPrefix SyntheticPrefix,
syntheticSuffix *SyntheticSuffix,
) error {
i.handle.Release()
i.handle = block
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix)
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix, syntheticSuffix)
}

func (i *blockIter) invalidate() {
Expand Down Expand Up @@ -671,6 +685,21 @@ func (i *blockIter) decodeInternalKey(key []byte) (hiddenPoint bool) {
return hiddenPoint
}

func (i *blockIter) maybeReplaceSuffix(maybeFromCache bool) {
if i.syntheticSuffix != nil && i.ikey.UserKey != nil {
prefixLen := i.syntheticSuffix.split(i.ikey.UserKey)
if !maybeFromCache {
i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...)
return
}
// If ikey is cached or may get cached, we must de-reference
// UserKey before prefix replacement.
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:prefixLen]...)
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...)
i.ikey.UserKey = i.syntheticSuffix.buf
}
}

func (i *blockIter) clearCache() {
i.cached = i.cached[:0]
i.cachedBuf = i.cachedBuf[:0]
Expand Down Expand Up @@ -811,7 +840,14 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
if !i.valid() {
return nil, base.LazyValue{}
}
// Before conducting any user key comparison, we must replace the suffix.
i.maybeReplaceSuffix(false)

if !hiddenPoint && i.cmp(i.ikey.UserKey, key) >= 0 {
i.correctSeekOffByOneError(key)
if !i.valid() {
return nil, base.LazyValue{}
}
// Initialize i.lazyValue
if !i.lazyValueHandling.hasValuePrefix ||
base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet {
Expand All @@ -826,12 +862,58 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
for i.Next(); i.valid(); i.Next() {
if i.cmp(i.ikey.UserKey, key) >= 0 {
// i.Next() has already initialized i.lazyValue.
i.correctSeekOffByOneError(key)
if !i.valid() {
return nil, base.LazyValue{}
}
return &i.ikey, i.lazyValue
}
}
return nil, base.LazyValue{}
}

func (i *blockIter) correctSeekOffByOneError(key []byte) {
if i.syntheticSuffix != nil && i.ikey.UserKey != nil {
// To seek in a block with suffix replacement, we rely on 2 invariants to correct a potential off by 1 error:
// - (1) each prefix has a unique suffix
// - (2) byte.Compare(replacementSuffix,originalSuffix) > 0, for all keys
//
// To illustrate the off by 1 error, consider the following block with a
// restart interval of 2 and replacement suffix of "4":
// - Pre-suffix replacement: blueapple@1, {blue}banana@3, redapple@2, {red}banana@1
// - Post-suffix replacement: blueapple@4, {blue}banana@4, redapple@4, {red}banana@4
//
// Suppose the client seeks with blueapple@3. Because timestamps sort in
// reverse chronological order (i.e. blueapple@3>blueapple@4), a seek on the
// block _before_ replacement would return blueapple@1, but _after_
// replacement we return bluebanana@4. So, calling i.Next() after the
// pre-replacement seek would return the right answer.
//
// More generally, let k be the seeking key, a_pre the original key we seek to
// before suffix replacement, a_post the suffix replaced original key, and b
// the correct key to seek to. The iterator must call i.Next() iff k > a_post
// && k <= a_pre to return b. Call this the off-by-1 condition. If the
// off-by-1 conidtion isn't hit, the iterator always returns b.
//
// Further, if the iter hits the off-by-1 condition, we know the correct key
// is exactly 1 key away from a_pre because of invariant (1): each prefix has
// a unique suffix. Further, the correct key is always returned after i.Next()
// because of invariant (2).
//
// Check if k <= a_pre
seekKeyGeqPre := bytes.Compare(key, i.ikey.UserKey) <= 0

// Check if k > a_post; Use the syntheticSufix buf to compute a_post, and
// only replace user key if off by 1 condition is satisfied.
i.maybeReplaceSuffix(false)
seekKeyLessPost := bytes.Compare(key, i.ikey.UserKey) > 0

if seekKeyGeqPre && seekKeyLessPost {
i.Next()
}
}
}

// SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the
// pebble package.
func (i *blockIter) SeekPrefixGE(
Expand Down Expand Up @@ -979,6 +1061,8 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
// of hidden keys we will be able to skip whole blocks (using block
// property filters) so we don't bother optimizing.
hiddenPoint := i.decodeInternalKey(i.key)
// This key may get put in the cache, so
i.maybeReplaceSuffix(true)

// NB: we don't use the hiddenPoint return value of decodeInternalKey
// since we want to stop as soon as we reach a key >= ikey.UserKey, so
Expand Down Expand Up @@ -1040,6 +1124,7 @@ func (i *blockIter) First() (*InternalKey, base.LazyValue) {
if hiddenPoint {
return i.Next()
}
i.maybeReplaceSuffix(false)
if !i.lazyValueHandling.hasValuePrefix ||
base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet {
i.lazyValue = base.MakeInPlaceValue(i.val)
Expand Down Expand Up @@ -1082,6 +1167,7 @@ func (i *blockIter) Last() (*InternalKey, base.LazyValue) {
if hiddenPoint {
return i.Prev()
}
i.maybeReplaceSuffix(true)
if !i.lazyValueHandling.hasValuePrefix ||
base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet {
i.lazyValue = base.MakeInPlaceValue(i.val)
Expand Down Expand Up @@ -1130,6 +1216,10 @@ start:
if hiddenPoint {
goto start
}
if i.syntheticSuffix != nil {
prefixLen := i.syntheticSuffix.split(i.ikey.UserKey)
i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...)
}
} else {
i.ikey.Trailer = uint64(InternalKeyKindInvalid)
i.ikey.UserKey = nil
Expand Down Expand Up @@ -1400,6 +1490,10 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue)
if i.globalSeqNum != 0 {
i.ikey.SetSeqNum(i.globalSeqNum)
}
if i.syntheticSuffix != nil {
prefixLen := i.syntheticSuffix.split(i.ikey.UserKey)
i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...)
}
} else {
i.ikey.Trailer = uint64(InternalKeyKindInvalid)
i.ikey.UserKey = nil
Expand Down Expand Up @@ -1458,6 +1552,14 @@ start:
if i.globalSeqNum != 0 {
i.ikey.SetSeqNum(i.globalSeqNum)
}
if i.syntheticSuffix != nil {
suffixLen := i.syntheticSuffix.split(i.ikey.UserKey)
// If ikey is cached or may get cached, we must de-reference
// UserKey before prefix replacement.
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...)
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...)
i.ikey.UserKey = i.syntheticSuffix.buf
}
} else {
i.ikey.Trailer = uint64(InternalKeyKindInvalid)
i.ikey.UserKey = nil
Expand Down Expand Up @@ -1535,6 +1637,14 @@ start:
// Use the cache.
goto start
}
if i.syntheticSuffix != nil {
suffixLen := i.syntheticSuffix.split(i.ikey.UserKey)
// If ikey is cached or may get cached, we must de-reference
// UserKey before prefix replacement.
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...)
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...)
i.ikey.UserKey = i.syntheticSuffix.buf
}
if !i.lazyValueHandling.hasValuePrefix ||
base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet {
i.lazyValue = base.MakeInPlaceValue(i.val)
Expand Down
6 changes: 3 additions & 3 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) {

var blocks []int
var i int
iter, _ := newBlockIter(r.Compare, indexH.Get(), nil)
iter, _ := newBlockIter(r.Compare, indexH.Get(), nil, nil)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
twoLevelIndex := r.Properties.IndexPartitions > 0
i, err := newBlockIter(r.Compare, bh.Get(), nil)
i, err := newBlockIter(r.Compare, bh.Get(), nil, nil)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
if err := subiter.init(
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil {
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix, r.syntheticSuffix); err != nil {
return err.Error()
}
for key, value := subiter.First(); key != nil; key, value = subiter.Next() {
Expand Down
Loading

0 comments on commit a63110b

Please sign in to comment.