From 40f44c83ebab39946cfaffb9d421671632de497a Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 28 Oct 2021 13:39:27 -0400 Subject: [PATCH] internal/keyspan: add Value to Span Add a Value to the `Span` type, copying it to both fragments at a fragmentation point. This will be used in the implementation of range keys, which will be fragmented but carry a value. --- batch.go | 4 +- compaction.go | 2 +- compaction_iter_test.go | 5 +- internal/keyspan/fragmenter.go | 81 ++++++++------- internal/keyspan/fragmenter_test.go | 109 +++++++++++++------- internal/keyspan/seek_test.go | 6 +- internal/keyspan/span.go | 3 +- internal/keyspan/testdata/fragmenter | 2 +- internal/keyspan/testdata/fragmenter_values | 88 ++++++++++++++++ internal/keyspan/truncate_test.go | 6 +- level_checker_test.go | 2 +- level_iter_test.go | 2 +- mem_table.go | 2 +- merging_iter_test.go | 2 +- sstable/data_test.go | 2 +- sstable/reader.go | 3 +- 16 files changed, 221 insertions(+), 98 deletions(-) create mode 100644 internal/keyspan/testdata/fragmenter_values diff --git a/batch.go b/batch.go index 760ab4c4ff..d2c8c62ff5 100644 --- a/batch.go +++ b/batch.go @@ -720,7 +720,7 @@ func (b *Batch) newRangeDelIter(o *IterOptions) internalIterator { // tombstones will remain valid, pointing into the old Batch.data. GC for // the win. for key, val := it.First(); key != nil; key, val = it.Next() { - frag.Add(*key, val) + frag.Add(keyspan.Span{Start: *key, End: val}) } frag.Finish() } @@ -1153,7 +1153,7 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) *flushableBatch { index: -1, } for key, val := it.First(); key != nil; key, val = it.Next() { - frag.Add(*key, val) + frag.Add(keyspan.Span{Start: *key, End: val}) } frag.Finish() } diff --git a/compaction.go b/compaction.go index b32f9ebef9..438ea93835 100644 --- a/compaction.go +++ b/compaction.go @@ -2436,7 +2436,7 @@ func (d *DB) runCompaction( // written later during `finishOutput()`. We add them to the // `Fragmenter` now to make them visible to `compactionIter` so covered // keys in the same snapshot stripe can be elided. - c.rangeDelFrag.Add(iter.cloneKey(*key), val) + c.rangeDelFrag.Add(keyspan.Span{Start: iter.cloneKey(*key), End: val}) continue } if tw == nil { diff --git a/compaction_iter_test.go b/compaction_iter_test.go index dd79b8ed28..fe4da0b1c5 100644 --- a/compaction_iter_test.go +++ b/compaction_iter_test.go @@ -203,7 +203,10 @@ func TestCompactionIter(t *testing.T) { if iter.Valid() { fmt.Fprintf(&b, "%s:%s\n", iter.Key(), iter.Value()) if iter.Key().Kind() == InternalKeyKindRangeDelete { - iter.rangeDelFrag.Add(iter.cloneKey(iter.Key()), iter.Value()) + iter.rangeDelFrag.Add(keyspan.Span{ + Start: iter.cloneKey(iter.Key()), + End: iter.Value(), + }) } } else if err := iter.Error(); err != nil { fmt.Fprintf(&b, "err=%v\n", err) diff --git a/internal/keyspan/fragmenter.go b/internal/keyspan/fragmenter.go index 5dfd68d358..d3826ec00c 100644 --- a/internal/keyspan/fragmenter.go +++ b/internal/keyspan/fragmenter.go @@ -172,23 +172,22 @@ func (f *Fragmenter) checkInvariants(buf []Span) { // // This process continues until there are no more fragments to flush. // -// WARNING: the slices backing start.UserKey and end are retained after -// this method returns and should not be modified. This is safe for -// spans that are added from a memtable or batch. It is not safe for a -// range deletion span added from an sstable where the range-del block -// has been prefix compressed. -func (f *Fragmenter) Add(start base.InternalKey, end []byte) { +// WARNING: the slices backing start.UserKey and end are retained after this +// method returns and should not be modified. This is safe for spans that are +// added from a memtable or batch. It is not safe for a range deletion span +// added from an sstable where the range-del block has been prefix compressed. +func (f *Fragmenter) Add(s Span) { if f.finished { panic("pebble: span fragmenter already finished") } if f.flushedKey != nil { - switch c := f.Cmp(start.UserKey, f.flushedKey); { + switch c := f.Cmp(s.Start.UserKey, f.flushedKey); { case c < 0: panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)", - f.Format(start.UserKey), f.Format(f.flushedKey))) + f.Format(s.Start.UserKey), f.Format(f.flushedKey))) } } - if f.Cmp(start.UserKey, end) >= 0 { + if f.Cmp(s.Start.UserKey, s.End) >= 0 { // An empty span, we can ignore it. return } @@ -200,29 +199,23 @@ func (f *Fragmenter) Add(start base.InternalKey, end []byte) { if len(f.pending) > 0 { // Since all of the pending spans have the same start key, we only need // to compare against the first one. - switch c := f.Cmp(f.pending[0].Start.UserKey, start.UserKey); { + switch c := f.Cmp(f.pending[0].Start.UserKey, s.Start.UserKey); { case c > 0: panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s", - f.pending[0].Start.Pretty(f.Format), start.Pretty(f.Format))) + f.pending[0].Start.Pretty(f.Format), s.Start.Pretty(f.Format))) case c == 0: // The new span has the same start key as the existing pending // spans. Add it to the pending buffer. - f.pending = append(f.pending, Span{ - Start: start, - End: end, - }) + f.pending = append(f.pending, s) return } // At this point we know that the new start key is greater than the pending // spans start keys. - f.truncateAndFlush(start.UserKey) + f.truncateAndFlush(s.Start.UserKey) } - f.pending = append(f.pending, Span{ - Start: start, - End: end, - }) + f.pending = append(f.pending, s) } // Covers returns true if the specified key is covered by one of the pending @@ -243,12 +236,12 @@ func (f *Fragmenter) Covers(key base.InternalKey, snapshot uint64) bool { } seqNum := key.SeqNum() - for _, t := range f.pending { - if f.Cmp(key.UserKey, t.End) < 0 { + for _, s := range f.pending { + if f.Cmp(key.UserKey, s.End) < 0 { // NB: A range deletion tombstone does not delete a point operation // at the same sequence number, and broadly a span is not considered // to cover a point operation at the same sequence number. - if t.Start.Visible(snapshot) && t.Start.SeqNum() > seqNum { + if s.Start.Visible(snapshot) && s.Start.SeqNum() > seqNum { return true } } @@ -261,9 +254,10 @@ func (f *Fragmenter) Empty() bool { return f.finished || len(f.pending) == 0 } -// FlushTo flushes all of the fragments before key. Used during compaction to -// force emitting of spans which straddle an sstable boundary. Note that the -// emitted spans are not truncated to the specified key. Consider the scenario: +// FlushTo flushes all of the fragments with a start key <= key. Used during +// compaction to force emitting of spans which straddle an sstable boundary. +// Note that the emitted spans are not truncated to the specified key. Consider +// the scenario: // // a---------k#10 // f#8 @@ -319,13 +313,14 @@ func (f *Fragmenter) FlushTo(key []byte) { // would become empty. pending := f.pending f.pending = f.pending[:0] - for _, t := range pending { - if f.Cmp(key, t.End) < 0 { - // t: a--+--e + for _, s := range pending { + if f.Cmp(key, s.End) < 0 { + // s: a--+--e // new: c------ f.pending = append(f.pending, Span{ - Start: base.MakeInternalKey(key, t.Start.SeqNum(), t.Start.Kind()), - End: t.End, + Start: base.MakeInternalKey(key, s.Start.SeqNum(), s.Start.Kind()), + End: s.End, + Value: s.Value, }) } } @@ -407,21 +402,26 @@ func (f *Fragmenter) truncateAndFlush(key []byte) { // pending and f.pending share the same underlying storage. As we iterate // over pending we append to f.pending, but only one entry is appended in // each iteration, after we have read the entry being overwritten. - for _, t := range pending { - if f.Cmp(key, t.End) < 0 { - // t: a--+--e + for _, s := range pending { + if f.Cmp(key, s.End) < 0 { + // s: a--+--e // new: c------ - if f.Cmp(t.Start.UserKey, key) < 0 { - done = append(done, Span{Start: t.Start, End: key}) + if f.Cmp(s.Start.UserKey, key) < 0 { + done = append(done, Span{ + Start: s.Start, + End: key, + Value: s.Value, + }) } f.pending = append(f.pending, Span{ - Start: base.MakeInternalKey(key, t.Start.SeqNum(), t.Start.Kind()), - End: t.End, + Start: base.MakeInternalKey(key, s.Start.SeqNum(), s.Start.Kind()), + End: s.End, + Value: s.Value, }) } else { - // t: a-----e + // s: a-----e // new: e---- - done = append(done, t) + done = append(done, s) } } @@ -469,6 +469,7 @@ func (f *Fragmenter) flush(buf []Span, lastKey []byte) { f.flushBuf = append(f.flushBuf, Span{ Start: buf[i].Start, End: split, + Value: buf[i].Value, }) } diff --git a/internal/keyspan/fragmenter_test.go b/internal/keyspan/fragmenter_test.go index d2a54696c9..a044e7110f 100644 --- a/internal/keyspan/fragmenter_test.go +++ b/internal/keyspan/fragmenter_test.go @@ -17,30 +17,31 @@ import ( "github.com/stretchr/testify/require" ) -var tombstoneRe = regexp.MustCompile(`(\d+):\s*(\w+)-*(\w+)`) +var spanRe = regexp.MustCompile(`(\d+):\s*(\w+)-*(\w+)\w*([^\n]*)`) -func parseTombstone(t *testing.T, s string) Span { - m := tombstoneRe.FindStringSubmatch(s) - if len(m) != 4 { - t.Fatalf("expected 4 components, but found %d: %s", len(m), s) +func parseSpan(t *testing.T, s string, kind base.InternalKeyKind) Span { + m := spanRe.FindStringSubmatch(s) + if len(m) != 5 { + t.Fatalf("expected 5 components, but found %d: %s", len(m), s) } seqNum, err := strconv.Atoi(m[1]) require.NoError(t, err) return Span{ - Start: base.MakeInternalKey([]byte(m[2]), uint64(seqNum), base.InternalKeyKindRangeDelete), + Start: base.MakeInternalKey([]byte(m[2]), uint64(seqNum), kind), End: []byte(m[3]), + Value: []byte(strings.TrimSpace(m[4])), } } -func buildTombstones( - t *testing.T, cmp base.Compare, formatKey base.FormatKey, s string, +func buildSpans( + t *testing.T, cmp base.Compare, formatKey base.FormatKey, s string, kind base.InternalKeyKind, ) []Span { - var tombstones []Span + var spans []Span f := &Fragmenter{ Cmp: cmp, Format: formatKey, Emit: func(fragmented []Span) { - tombstones = append(tombstones, fragmented...) + spans = append(spans, fragmented...) }, } for _, line := range strings.Split(s, "\n") { @@ -60,14 +61,13 @@ func buildTombstones( continue } - t := parseTombstone(t, line) - f.Add(t.Start, t.End) + f.Add(parseSpan(t, line, kind)) } f.Finish() - return tombstones + return spans } -func formatTombstones(tombstones []Span) string { +func formatSpans(spans []Span) string { isLetter := func(b []byte) bool { if len(b) != 1 { return false @@ -76,21 +76,25 @@ func formatTombstones(tombstones []Span) string { } var buf bytes.Buffer - for _, v := range tombstones { - if v.Empty() { - fmt.Fprintf(&buf, "\n") - continue + for _, v := range spans { + switch { + case v.Empty(): + fmt.Fprintf(&buf, "") + case !isLetter(v.Start.UserKey) || !isLetter(v.End) || v.Start.UserKey[0] == v.End[0]: + fmt.Fprintf(&buf, "%d: %s-%s", v.Start.SeqNum(), v.Start.UserKey, v.End) + default: + fmt.Fprintf(&buf, "%d: %s%s%s%s", + v.Start.SeqNum(), + strings.Repeat(" ", int(v.Start.UserKey[0]-'a')), + v.Start.UserKey, + strings.Repeat("-", int(v.End[0]-v.Start.UserKey[0]-1)), + v.End) } - if !isLetter(v.Start.UserKey) || !isLetter(v.End) || v.Start.UserKey[0] == v.End[0] { - fmt.Fprintf(&buf, "%d: %s-%s\n", v.Start.SeqNum(), v.Start.UserKey, v.End) - continue + if len(v.Value) > 0 { + buf.WriteString(strings.Repeat(" ", int('z'-v.End[0]+1))) + buf.WriteString(string(v.Value)) } - fmt.Fprintf(&buf, "%d: %s%s%s%s\n", - v.Start.SeqNum(), - strings.Repeat(" ", int(v.Start.UserKey[0]-'a')), - v.Start.UserKey, - strings.Repeat("-", int(v.End[0]-v.Start.UserKey[0]-1)), - v.End) + buf.WriteRune('\n') } return buf.String() } @@ -114,12 +118,12 @@ func TestFragmenter(t *testing.T) { var iter base.InternalIterator // Returns true if the specified pair is deleted at the specified - // read sequence number. Get ignores tombstones newer than the read sequence + // read sequence number. Get ignores spans newer than the read sequence // number. This is a simple version of what full processing of range // tombstones looks like. deleted := func(key []byte, seq, readSeq uint64) bool { - tombstone := Get(cmp, iter, key, readSeq) - return tombstone.Covers(seq) + s := Get(cmp, iter, key, readSeq) + return s.Covers(seq) } datadriven.RunTest(t, "testdata/fragmenter", func(d *datadriven.TestData) string { @@ -132,9 +136,9 @@ func TestFragmenter(t *testing.T) { } }() - tombstones := buildTombstones(t, cmp, fmtKey, d.Input) - iter = NewIter(cmp, tombstones) - return formatTombstones(tombstones) + spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete) + iter = NewIter(cmp, spans) + return formatSpans(spans) }() case "get": @@ -178,8 +182,8 @@ func TestFragmenterDeleted(t *testing.T) { for _, line := range strings.Split(d.Input, "\n") { switch { case strings.HasPrefix(line, "add "): - t := parseTombstone(t, strings.TrimPrefix(line, "add ")) - f.Add(t.Start, t.End) + t := parseSpan(t, strings.TrimPrefix(line, "add "), base.InternalKeyKindRangeDelete) + f.Add(t) case strings.HasPrefix(line, "deleted "): key := base.ParseInternalKey(strings.TrimPrefix(line, "deleted ")) func() { @@ -214,8 +218,8 @@ func TestFragmenterFlushTo(t *testing.T) { } }() - tombstones := buildTombstones(t, cmp, fmtKey, d.Input) - return formatTombstones(tombstones) + spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete) + return formatSpans(spans) }() default: @@ -238,8 +242,35 @@ func TestFragmenterTruncateAndFlushTo(t *testing.T) { } }() - tombstones := buildTombstones(t, cmp, fmtKey, d.Input) - return formatTombstones(tombstones) + spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete) + return formatSpans(spans) + }() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) +} + +func TestFragmenter_Values(t *testing.T) { + cmp := base.DefaultComparer.Compare + fmtKey := base.DefaultComparer.FormatKey + + datadriven.RunTest(t, "testdata/fragmenter_values", func(d *datadriven.TestData) string { + switch d.Cmd { + case "build": + return func() (result string) { + defer func() { + if r := recover(); r != nil { + result = fmt.Sprint(r) + } + }() + + // TODO(jackson): Keys of kind InternalKeyKindRangeDelete don't + // have values. Update the call below when we have KindRangeSet, + // KindRangeUnset. + spans := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete) + return formatSpans(spans) }() default: diff --git a/internal/keyspan/seek_test.go b/internal/keyspan/seek_test.go index 0a4737b60c..7321968832 100644 --- a/internal/keyspan/seek_test.go +++ b/internal/keyspan/seek_test.go @@ -67,9 +67,9 @@ func TestSeek(t *testing.T) { datadriven.RunTest(t, "testdata/seek", func(d *datadriven.TestData) string { switch d.Cmd { case "build": - tombstones := buildTombstones(t, cmp, fmtKey, d.Input) + tombstones := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete) iter.Iter = NewIter(cmp, tombstones) - return formatTombstones(tombstones) + return formatSpans(tombstones) case "seek-ge", "seek-le": seek := SeekGE @@ -88,7 +88,7 @@ func TestSeek(t *testing.T) { return err.Error() } tombstone := seek(cmp, iter, []byte(parts[0]), seq) - fmt.Fprintf(&buf, "%s", strings.TrimSpace(formatTombstones([]Span{tombstone}))) + fmt.Fprintf(&buf, "%s", strings.TrimSpace(formatSpans([]Span{tombstone}))) fmt.Fprintf(&buf, "\n") } return buf.String() diff --git a/internal/keyspan/span.go b/internal/keyspan/span.go index 8c9ba50190..e241423102 100644 --- a/internal/keyspan/span.go +++ b/internal/keyspan/span.go @@ -12,7 +12,7 @@ import ( ) // Span represents a key-kind over a span of user keys, along with the -// sequence number at which it was written. +// sequence number at which it was written and optionally a value. // // Conceptually it represents (key-kind, [start, end))#seqnum. The key // kind, start and sequence number are stored in a base.InternalKey @@ -26,6 +26,7 @@ import ( type Span struct { Start base.InternalKey End []byte + Value []byte // optional } // Overlaps returns 0 if this span overlaps the other, -1 if there's no diff --git a/internal/keyspan/testdata/fragmenter b/internal/keyspan/testdata/fragmenter index 38e4f180a5..5af5eebf7e 100644 --- a/internal/keyspan/testdata/fragmenter +++ b/internal/keyspan/testdata/fragmenter @@ -22,7 +22,7 @@ build 2: j--m 1: j--m 2: m-----s -1: m-----s------z +1: m-----s 1: s------z ---- 3: a----f diff --git a/internal/keyspan/testdata/fragmenter_values b/internal/keyspan/testdata/fragmenter_values new file mode 100644 index 0000000000..8e2e5fcbc8 --- /dev/null +++ b/internal/keyspan/testdata/fragmenter_values @@ -0,0 +1,88 @@ +build +3: a-----------m apples +2: f------------s bananas +1: j---------------z coconuts +---- +3: a----f apples +3: f---j apples +2: f---j bananas +3: j--m apples +2: j--m bananas +1: j--m coconuts +2: m-----s bananas +1: m-----s coconuts +1: s------z coconuts + +# Building is idempotent. +build +3: a----f a +3: f---j b +2: f---j c +3: j--m d +2: j--m e +1: j--m f +2: m-----s g +1: m-----s h +1: s------z i +---- +3: a----f a +3: f---j b +2: f---j c +3: j--m d +2: j--m e +1: j--m f +2: m-----s g +1: m-----s h +1: s------z i + +build +2: a--c apple +1: b--d banana +truncate-and-flush-to c +---- +2: ab apple +2: bc apple +1: bc banana +1: cd banana + +build +3: a-c apple +2: a---e banana +1: a-----g coconut +truncate-and-flush-to d +3: d----i orange +---- +3: a-c apple +2: a-c banana +1: a-c coconut +2: cd banana +1: cd coconut +3: de orange +2: de banana +1: de coconut +3: e-g orange +1: e-g coconut +3: g-i orange + +# NB: Unlike the above truncate-and-flush-to calls, a flush-to does not truncate +# the end boundary. In this case, the fragments beginning at `c` are not +# truncated to `d`, they're flushed with the bounadries formed by fragmentation +# (`e`) +build +3: a-c apple +2: a---e banana +1: a-----g coconut +flush-to d +3: d----i orange +---- +3: a-c apple +2: a-c banana +1: a-c coconut +2: c-e banana +1: c-e coconut +3: de orange +2: de banana +1: de coconut +3: e-g orange +1: e-g coconut +3: g-i orange diff --git a/internal/keyspan/truncate_test.go b/internal/keyspan/truncate_test.go index 66e7e9852d..005ef25ed2 100644 --- a/internal/keyspan/truncate_test.go +++ b/internal/keyspan/truncate_test.go @@ -21,9 +21,9 @@ func TestTruncate(t *testing.T) { datadriven.RunTest(t, "testdata/truncate", func(d *datadriven.TestData) string { switch d.Cmd { case "build": - tombstones := buildTombstones(t, cmp, fmtKey, d.Input) + tombstones := buildSpans(t, cmp, fmtKey, d.Input, base.InternalKeyKindRangeDelete) iter = NewIter(cmp, tombstones) - return formatTombstones(tombstones) + return formatSpans(tombstones) case "truncate": if len(d.Input) > 0 { @@ -53,7 +53,7 @@ func TestTruncate(t *testing.T) { upper := []byte(parts[1]) truncated := Truncate(cmp, iter, lower, upper, startKey, endKey) - return formatTombstones(truncated.spans) + return formatSpans(truncated.spans) default: return fmt.Sprintf("unknown command: %s", d.Cmd) diff --git a/level_checker_test.go b/level_checker_test.go index 2746d1fdc3..31d9828ed7 100644 --- a/level_checker_test.go +++ b/level_checker_test.go @@ -182,7 +182,7 @@ func TestCheckLevelsCornerCases(t *testing.T) { err = w.Add(ikey, value) break } - frag.Add(ikey, value) + frag.Add(keyspan.Span{Start: ikey, End: value}) default: err = w.Add(ikey, value) } diff --git a/level_iter_test.go b/level_iter_test.go index 5fb9f6e367..24adaf3221 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -202,7 +202,7 @@ func (lt *levelIterTest) runBuild(d *datadriven.TestData) string { value := []byte(key[j+1:]) switch ikey.Kind() { case InternalKeyKindRangeDelete: - f.Add(ikey, value) + f.Add(keyspan.Span{Start: ikey, End: value}) default: if err := w.Add(ikey, value); err != nil { return err.Error() diff --git a/mem_table.go b/mem_table.go index de11439c91..50af703a2c 100644 --- a/mem_table.go +++ b/mem_table.go @@ -295,7 +295,7 @@ func (f *keySpanFrags) get( } it := skl.NewIter(nil, nil) for key, val := it.First(); key != nil; key, val = it.Next() { - frag.Add(*key, val) + frag.Add(keyspan.Span{Start: *key, End: val}) } frag.Finish() }) diff --git a/merging_iter_test.go b/merging_iter_test.go index d639fd58c9..288c9243c0 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -209,7 +209,7 @@ func TestMergingIterCornerCases(t *testing.T) { value := []byte(kv[j+1:]) switch ikey.Kind() { case InternalKeyKindRangeDelete: - frag.Add(ikey, value) + frag.Add(keyspan.Span{Start: ikey, End: value}) default: if err := w.Add(ikey, value); err != nil { return err.Error() diff --git a/sstable/data_test.go b/sstable/data_test.go index 6d62f8925d..959bbdb7d6 100644 --- a/sstable/data_test.go +++ b/sstable/data_test.go @@ -78,7 +78,7 @@ func runBuildCmd( err = errors.Errorf("%v", r) } }() - f.Add(key, value) + f.Add(keyspan.Span{Start: key, End: value}) }() if err != nil { return nil, nil, err diff --git a/sstable/reader.go b/sstable/reader.go index e167b88889..50362ca4d8 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -2287,8 +2287,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { }, } for i := range tombstones { - t := &tombstones[i] - frag.Add(t.Start, t.End) + frag.Add(tombstones[i]) } frag.Finish()