Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58045: storageccl: make KeyRewriter lookup O(logn) and memoized r=dt a=dt

The previous code was O(n) in terms of number of tables, and could be very slow for very large numbers of tables.

The O(n) code on the two-prefix benchmark clocked in at about 10ns/op. Sorting the prefixes allows binary searching to find a matching one, but on this small benchmark, the binary search version alone actually takes closer to 30ns/op. 

Adding memoization should help considerably, both in the small benchmark, but also in real world usage too: as keys are passed to the rewriter in order and will almost always be matched by the same table prefix, memoization of just the last match likely has nearly 100% hit rate, and means that the log n lookup should only happen ~once per range.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Dec 21, 2020
2 parents ba3b8ec + 66a46fe commit ff612f1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 30 deletions.
9 changes: 1 addition & 8 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
ctx := rd.Ctx
evalCtx := rd.EvalCtx
var summary roachpb.BulkOpSummary
// rekeys could be using table descriptors from either the old or new
// foreign key representation on the table descriptor, but this is fine
// because foreign keys don't matter for the key rewriter.
kr, err := storageccl.MakeKeyRewriterFromRekeys(rd.spec.Rekeys)
if err != nil {
return summary, errors.Wrap(err, "make key rewriter")
}

// The sstables only contain MVCC data and no intents, so using an MVCC
// iterator is sufficient.
Expand Down Expand Up @@ -285,7 +278,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
value := roachpb.Value{RawBytes: valueScratch}
iter.NextKey()

key.Key, ok, err = kr.RewriteKey(key.Key, false /* isFromSpan */)
key.Key, ok, err = rd.kr.RewriteKey(key.Key, false /* isFromSpan */)
if err != nil {
return summary, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,10 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
} {
t.Run(fmt.Sprintf("%d-%v", i, testCase), func(t *testing.T) {
newID := descpb.ID(100 + i)
kr := prefixRewriter{{
kr := prefixRewriter{rewrites: []prefixRewrite{{
OldPrefix: srcPrefix,
NewPrefix: makeKeyRewriterPrefixIgnoringInterleaved(newID, indexID),
}}
}}}
rekeys := []roachpb.ImportRequest_TableRekey{
{
OldID: oldID,
Expand Down
57 changes: 42 additions & 15 deletions pkg/ccl/storageccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package storageccl

import (
"bytes"
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -25,29 +26,50 @@ import (
type prefixRewrite struct {
OldPrefix []byte
NewPrefix []byte
noop bool
}

// prefixRewriter is a matcher for an ordered list of pairs of byte prefix
// rewrite rules.
type prefixRewriter []prefixRewrite
type prefixRewriter struct {
rewrites []prefixRewrite
last int
}

// RewriteKey modifies key using the first matching rule and returns
// it. If no rules matched, returns false and the original input key.
func (p prefixRewriter) rewriteKey(key []byte) ([]byte, bool) {
for _, rewrite := range p {
if bytes.HasPrefix(key, rewrite.OldPrefix) {
if len(rewrite.OldPrefix) == len(rewrite.NewPrefix) {
copy(key[:len(rewrite.OldPrefix)], rewrite.NewPrefix)
return key, true
}
// TODO(dan): Special case when key's cap() is enough.
newKey := make([]byte, 0, len(rewrite.NewPrefix)+len(key)-len(rewrite.OldPrefix))
newKey = append(newKey, rewrite.NewPrefix...)
newKey = append(newKey, key[len(rewrite.OldPrefix):]...)
return newKey, true
if len(p.rewrites) < 1 {
return key, false
}

found := p.last
if !bytes.HasPrefix(key, p.rewrites[found].OldPrefix) {
// since prefixes are sorted, we can binary search to find where a matching
// prefix would be. We use the predicate HasPrefix (what we want) or greater
// (after what we want) to search.
found = sort.Search(len(p.rewrites), func(i int) bool {
return bytes.HasPrefix(key, p.rewrites[i].OldPrefix) || bytes.Compare(key, p.rewrites[i].OldPrefix) < 0
})
if found == len(p.rewrites) || !bytes.HasPrefix(key, p.rewrites[found].OldPrefix) {
return key, false
}
}
return key, false

p.last = found
rewrite := p.rewrites[found]
if rewrite.noop {
return key, true
}
if len(rewrite.OldPrefix) == len(rewrite.NewPrefix) {
copy(key[:len(rewrite.OldPrefix)], rewrite.NewPrefix)
return key, true
}
// TODO(dan): Special case when key's cap() is enough.
newKey := make([]byte, 0, len(rewrite.NewPrefix)+len(key)-len(rewrite.OldPrefix))
newKey = append(newKey, rewrite.NewPrefix...)
newKey = append(newKey, key[len(rewrite.OldPrefix):]...)
return newKey, true
}

// KeyRewriter rewrites old table IDs to new table IDs. It is able to descend
Expand Down Expand Up @@ -88,9 +110,10 @@ func MakeKeyRewriter(descs map[descpb.ID]*tabledesc.Immutable) (*KeyRewriter, er
newPrefix := roachpb.Key(makeKeyRewriterPrefixIgnoringInterleaved(desc.ID, index.ID))
if !seenPrefixes[string(oldPrefix)] {
seenPrefixes[string(oldPrefix)] = true
prefixes = append(prefixes, prefixRewrite{
prefixes.rewrites = append(prefixes.rewrites, prefixRewrite{
OldPrefix: oldPrefix,
NewPrefix: newPrefix,
noop: bytes.Equal(oldPrefix, newPrefix),
})
}
// All the encoded data for a index will have the prefix just added, but
Expand All @@ -100,13 +123,17 @@ func MakeKeyRewriter(descs map[descpb.ID]*tabledesc.Immutable) (*KeyRewriter, er
newPrefix = newPrefix.PrefixEnd()
if !seenPrefixes[string(oldPrefix)] {
seenPrefixes[string(oldPrefix)] = true
prefixes = append(prefixes, prefixRewrite{
prefixes.rewrites = append(prefixes.rewrites, prefixRewrite{
OldPrefix: oldPrefix,
NewPrefix: newPrefix,
noop: bytes.Equal(oldPrefix, newPrefix),
})
}
}
}
sort.Slice(prefixes.rewrites, func(i, j int) bool {
return bytes.Compare(prefixes.rewrites[i].OldPrefix, prefixes.rewrites[j].OldPrefix) < 0
})
return &KeyRewriter{
prefixes: prefixes,
descs: descs,
Expand Down
9 changes: 4 additions & 5 deletions pkg/ccl/storageccl/key_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func TestPrefixRewriter(t *testing.T) {
defer leaktest.AfterTest(t)()

kr := prefixRewriter{
kr := prefixRewriter{rewrites: []prefixRewrite{
{
OldPrefix: []byte{1, 2, 3},
NewPrefix: []byte{4, 5, 6},
Expand All @@ -36,8 +36,7 @@ func TestPrefixRewriter(t *testing.T) {
OldPrefix: []byte{7, 8, 9},
NewPrefix: []byte{10},
},
}

}}
t.Run("match", func(t *testing.T) {
key := []byte{1, 2, 3, 4}
newKey, ok := kr.rewriteKey(key)
Expand Down Expand Up @@ -159,7 +158,7 @@ func mustMarshalDesc(t *testing.T, tableDesc *descpb.TableDescriptor) []byte {
}

func BenchmarkPrefixRewriter(b *testing.B) {
kr := prefixRewriter{
kr := prefixRewriter{rewrites: []prefixRewrite{
{
OldPrefix: []byte{1, 2, 3},
NewPrefix: []byte{4, 5, 6},
Expand All @@ -168,7 +167,7 @@ func BenchmarkPrefixRewriter(b *testing.B) {
OldPrefix: []byte{7, 8, 9},
NewPrefix: []byte{10},
},
}
}}

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit ff612f1

Please sign in to comment.