Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/keyspan: add Value to Span #1382

Merged
merged 1 commit into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func (b *Batch) newRangeDelIter(o *IterOptions) internalIterator {
// tombstones will remain valid, pointing into the old Batch.data. GC for
// the win.
for key, val := it.First(); key != nil; key, val = it.Next() {
frag.Add(*key, val)
frag.Add(keyspan.Span{Start: *key, End: val})
}
frag.Finish()
}
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) *flushableBatch {
index: -1,
}
for key, val := it.First(); key != nil; key, val = it.Next() {
frag.Add(*key, val)
frag.Add(keyspan.Span{Start: *key, End: val})
}
frag.Finish()
}
Expand Down
2 changes: 1 addition & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,7 +2436,7 @@ func (d *DB) runCompaction(
// written later during `finishOutput()`. We add them to the
// `Fragmenter` now to make them visible to `compactionIter` so covered
// keys in the same snapshot stripe can be elided.
c.rangeDelFrag.Add(iter.cloneKey(*key), val)
c.rangeDelFrag.Add(keyspan.Span{Start: iter.cloneKey(*key), End: val})
continue
}
if tw == nil {
Expand Down
5 changes: 4 additions & 1 deletion compaction_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ func TestCompactionIter(t *testing.T) {
if iter.Valid() {
fmt.Fprintf(&b, "%s:%s\n", iter.Key(), iter.Value())
if iter.Key().Kind() == InternalKeyKindRangeDelete {
iter.rangeDelFrag.Add(iter.cloneKey(iter.Key()), iter.Value())
iter.rangeDelFrag.Add(keyspan.Span{
Start: iter.cloneKey(iter.Key()),
End: iter.Value(),
})
}
} else if err := iter.Error(); err != nil {
fmt.Fprintf(&b, "err=%v\n", err)
Expand Down
81 changes: 41 additions & 40 deletions internal/keyspan/fragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,22 @@ func (f *Fragmenter) checkInvariants(buf []Span) {
//
// This process continues until there are no more fragments to flush.
//
// WARNING: the slices backing start.UserKey and end are retained after
// this method returns and should not be modified. This is safe for
// spans that are added from a memtable or batch. It is not safe for a
// range deletion span added from an sstable where the range-del block
// has been prefix compressed.
func (f *Fragmenter) Add(start base.InternalKey, end []byte) {
// WARNING: the slices backing start.UserKey and end are retained after this
// method returns and should not be modified. This is safe for spans that are
// added from a memtable or batch. It is not safe for a range deletion span
// added from an sstable where the range-del block has been prefix compressed.
func (f *Fragmenter) Add(s Span) {
if f.finished {
panic("pebble: span fragmenter already finished")
}
if f.flushedKey != nil {
switch c := f.Cmp(start.UserKey, f.flushedKey); {
switch c := f.Cmp(s.Start.UserKey, f.flushedKey); {
case c < 0:
panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
f.Format(start.UserKey), f.Format(f.flushedKey)))
f.Format(s.Start.UserKey), f.Format(f.flushedKey)))
}
}
if f.Cmp(start.UserKey, end) >= 0 {
if f.Cmp(s.Start.UserKey, s.End) >= 0 {
// An empty span, we can ignore it.
return
}
Expand All @@ -200,29 +199,23 @@ func (f *Fragmenter) Add(start base.InternalKey, end []byte) {
if len(f.pending) > 0 {
// Since all of the pending spans have the same start key, we only need
// to compare against the first one.
switch c := f.Cmp(f.pending[0].Start.UserKey, start.UserKey); {
switch c := f.Cmp(f.pending[0].Start.UserKey, s.Start.UserKey); {
case c > 0:
panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
f.pending[0].Start.Pretty(f.Format), start.Pretty(f.Format)))
f.pending[0].Start.Pretty(f.Format), s.Start.Pretty(f.Format)))
case c == 0:
// The new span has the same start key as the existing pending
// spans. Add it to the pending buffer.
f.pending = append(f.pending, Span{
Start: start,
End: end,
})
f.pending = append(f.pending, s)
return
}

// At this point we know that the new start key is greater than the pending
// spans start keys.
f.truncateAndFlush(start.UserKey)
f.truncateAndFlush(s.Start.UserKey)
}

f.pending = append(f.pending, Span{
Start: start,
End: end,
})
f.pending = append(f.pending, s)
}

// Covers returns true if the specified key is covered by one of the pending
Expand All @@ -243,12 +236,12 @@ func (f *Fragmenter) Covers(key base.InternalKey, snapshot uint64) bool {
}

seqNum := key.SeqNum()
for _, t := range f.pending {
if f.Cmp(key.UserKey, t.End) < 0 {
for _, s := range f.pending {
if f.Cmp(key.UserKey, s.End) < 0 {
// NB: A range deletion tombstone does not delete a point operation
// at the same sequence number, and broadly a span is not considered
// to cover a point operation at the same sequence number.
if t.Start.Visible(snapshot) && t.Start.SeqNum() > seqNum {
if s.Start.Visible(snapshot) && s.Start.SeqNum() > seqNum {
return true
}
}
Expand All @@ -261,9 +254,10 @@ func (f *Fragmenter) Empty() bool {
return f.finished || len(f.pending) == 0
}

// FlushTo flushes all of the fragments before key. Used during compaction to
// force emitting of spans which straddle an sstable boundary. Note that the
// emitted spans are not truncated to the specified key. Consider the scenario:
// FlushTo flushes all of the fragments with a start key <= key. Used during
// compaction to force emitting of spans which straddle an sstable boundary.
// Note that the emitted spans are not truncated to the specified key. Consider
// the scenario:
//
// a---------k#10
// f#8
Expand Down Expand Up @@ -319,13 +313,14 @@ func (f *Fragmenter) FlushTo(key []byte) {
// would become empty.
pending := f.pending
f.pending = f.pending[:0]
for _, t := range pending {
if f.Cmp(key, t.End) < 0 {
// t: a--+--e
for _, s := range pending {
if f.Cmp(key, s.End) < 0 {
// s: a--+--e
// new: c------
f.pending = append(f.pending, Span{
Start: base.MakeInternalKey(key, t.Start.SeqNum(), t.Start.Kind()),
End: t.End,
Start: base.MakeInternalKey(key, s.Start.SeqNum(), s.Start.Kind()),
End: s.End,
Value: s.Value,
})
}
}
Expand Down Expand Up @@ -407,21 +402,26 @@ func (f *Fragmenter) truncateAndFlush(key []byte) {
// pending and f.pending share the same underlying storage. As we iterate
// over pending we append to f.pending, but only one entry is appended in
// each iteration, after we have read the entry being overwritten.
for _, t := range pending {
if f.Cmp(key, t.End) < 0 {
// t: a--+--e
for _, s := range pending {
if f.Cmp(key, s.End) < 0 {
// s: a--+--e
// new: c------
if f.Cmp(t.Start.UserKey, key) < 0 {
done = append(done, Span{Start: t.Start, End: key})
if f.Cmp(s.Start.UserKey, key) < 0 {
done = append(done, Span{
Start: s.Start,
End: key,
Value: s.Value,
})
}
f.pending = append(f.pending, Span{
Start: base.MakeInternalKey(key, t.Start.SeqNum(), t.Start.Kind()),
End: t.End,
Start: base.MakeInternalKey(key, s.Start.SeqNum(), s.Start.Kind()),
End: s.End,
Value: s.Value,
})
} else {
// t: a-----e
// s: a-----e
// new: e----
done = append(done, t)
done = append(done, s)
}
}

Expand Down Expand Up @@ -469,6 +469,7 @@ func (f *Fragmenter) flush(buf []Span, lastKey []byte) {
f.flushBuf = append(f.flushBuf, Span{
Start: buf[i].Start,
End: split,
Value: buf[i].Value,
})
}

Expand Down
109 changes: 70 additions & 39 deletions internal/keyspan/fragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,31 @@ import (
"github.com/stretchr/testify/require"
)

var tombstoneRe = regexp.MustCompile(`(\d+):\s*(\w+)-*(\w+)`)
var spanRe = regexp.MustCompile(`(\d+):\s*(\w+)-*(\w+)\w*([^\n]*)`)

func parseTombstone(t *testing.T, s string) Span {
m := tombstoneRe.FindStringSubmatch(s)
if len(m) != 4 {
t.Fatalf("expected 4 components, but found %d: %s", len(m), s)
func parseSpan(t *testing.T, s string, kind base.InternalKeyKind) Span {
m := spanRe.FindStringSubmatch(s)
if len(m) != 5 {
t.Fatalf("expected 5 components, but found %d: %s", len(m), s)
}
seqNum, err := strconv.Atoi(m[1])
require.NoError(t, err)
return Span{
Start: base.MakeInternalKey([]byte(m[2]), uint64(seqNum), base.InternalKeyKindRangeDelete),
Start: base.MakeInternalKey([]byte(m[2]), uint64(seqNum), kind),
End: []byte(m[3]),
Value: []byte(strings.TrimSpace(m[4])),
}
}

func buildTombstones(
t *testing.T, cmp base.Compare, formatKey base.FormatKey, s string,
func buildSpans(
t *testing.T, cmp base.Compare, formatKey base.FormatKey, s string, kind base.InternalKeyKind,
) []Span {
var tombstones []Span
var spans []Span
f := &Fragmenter{
Cmp: cmp,
Format: formatKey,
Emit: func(fragmented []Span) {
tombstones = append(tombstones, fragmented...)
spans = append(spans, fragmented...)
},
}
for _, line := range strings.Split(s, "\n") {
Expand All @@ -60,14 +61,13 @@ func buildTombstones(
continue
}

t := parseTombstone(t, line)
f.Add(t.Start, t.End)
f.Add(parseSpan(t, line, kind))
}
f.Finish()
return tombstones
return spans
}

func formatTombstones(tombstones []Span) string {
func formatSpans(spans []Span) string {
isLetter := func(b []byte) bool {
if len(b) != 1 {
return false
Expand All @@ -76,21 +76,25 @@ func formatTombstones(tombstones []Span) string {
}

var buf bytes.Buffer
for _, v := range tombstones {
if v.Empty() {
fmt.Fprintf(&buf, "<empty>\n")
continue
for _, v := range spans {
switch {
case v.Empty():
fmt.Fprintf(&buf, "<empty>")
case !isLetter(v.Start.UserKey) || !isLetter(v.End) || v.Start.UserKey[0] == v.End[0]:
fmt.Fprintf(&buf, "%d: %s-%s", v.Start.SeqNum(), v.Start.UserKey, v.End)
default:
fmt.Fprintf(&buf, "%d: %s%s%s%s",
v.Start.SeqNum(),
strings.Repeat(" ", int(v.Start.UserKey[0]-'a')),
v.Start.UserKey,
strings.Repeat("-", int(v.End[0]-v.Start.UserKey[0]-1)),
v.End)
}
if !isLetter(v.Start.UserKey) || !isLetter(v.End) || v.Start.UserKey[0] == v.End[0] {
fmt.Fprintf(&buf, "%d: %s-%s\n", v.Start.SeqNum(), v.Start.UserKey, v.End)
continue
if len(v.Value) > 0 {
buf.WriteString(strings.Repeat(" ", int('z'-v.End[0]+1)))
buf.WriteString(string(v.Value))
}
fmt.Fprintf(&buf, "%d: %s%s%s%s\n",
v.Start.SeqNum(),
strings.Repeat(" ", int(v.Start.UserKey[0]-'a')),
v.Start.UserKey,
strings.Repeat("-", int(v.End[0]-v.Start.UserKey[0]-1)),
v.End)
buf.WriteRune('\n')
}
return buf.String()
}
Expand All @@ -114,12 +118,12 @@ func TestFragmenter(t *testing.T) {
var iter base.InternalIterator

// Returns true if the specified <key,seq> pair is deleted at the specified
// read sequence number. Get ignores tombstones newer than the read sequence
// read sequence number. Get ignores spans newer than the read sequence
// number. This is a simple version of what full processing of range
// tombstones looks like.
deleted := func(key []byte, seq, readSeq uint64) bool {
tombstone := Get(cmp, iter, key, readSeq)
return tombstone.Covers(seq)
s := Get(cmp, iter, key, readSeq)
return s.Covers(seq)
}

datadriven.RunTest(t, "testdata/fragmenter", func(d *datadriven.TestData) string {
Expand All @@ -132,9 +136,9 @@ func TestFragmenter(t *testing.T) {
}
}()

tombstones := buildTombstones(t, cmp, fmtKey, d.Input)
iter = NewIter(cmp, tombstones)
return formatTombstones(tombstones)
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
iter = NewIter(cmp, spans)
return formatSpans(spans)
}()

case "get":
Expand Down Expand Up @@ -178,8 +182,8 @@ func TestFragmenterDeleted(t *testing.T) {
for _, line := range strings.Split(d.Input, "\n") {
switch {
case strings.HasPrefix(line, "add "):
t := parseTombstone(t, strings.TrimPrefix(line, "add "))
f.Add(t.Start, t.End)
t := parseSpan(t, strings.TrimPrefix(line, "add "), base.InternalKeyKindRangeDelete)
f.Add(t)
case strings.HasPrefix(line, "deleted "):
key := base.ParseInternalKey(strings.TrimPrefix(line, "deleted "))
func() {
Expand Down Expand Up @@ -214,8 +218,8 @@ func TestFragmenterFlushTo(t *testing.T) {
}
}()

tombstones := buildTombstones(t, cmp, fmtKey, d.Input)
return formatTombstones(tombstones)
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
return formatSpans(spans)
}()

default:
Expand All @@ -238,8 +242,35 @@ func TestFragmenterTruncateAndFlushTo(t *testing.T) {
}
}()

tombstones := buildTombstones(t, cmp, fmtKey, d.Input)
return formatTombstones(tombstones)
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
return formatSpans(spans)
}()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

func TestFragmenter_Values(t *testing.T) {
cmp := base.DefaultComparer.Compare
fmtKey := base.DefaultComparer.FormatKey

datadriven.RunTest(t, "testdata/fragmenter_values", func(d *datadriven.TestData) string {
switch d.Cmd {
case "build":
return func() (result string) {
defer func() {
if r := recover(); r != nil {
result = fmt.Sprint(r)
}
}()

// TODO(jackson): Keys of kind InternalKeyKindRangeDelete don't
// have values. Update the call below when we have KindRangeSet,
// KindRangeUnset.
spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete)
return formatSpans(spans)
}()

default:
Expand Down
Loading