Skip to content

Commit

Permalink
storage: add MVCCExportFingerprint method and fingerprintWriter
Browse files Browse the repository at this point in the history
This change introduces a fingerprintWriter that
hashes every key/timestamp and value for point keys, and
combines their hashes via a XOR into a running aggregate.
Range keys are not fingerprinted but instead written to a pebble SST that is
returned to the caller. This is because range keys do not have a stable,
discrete identity and so it is up to the caller to define a deterministic
fingerprinting scheme across all returned range keys.

The fingerprintWriter is used by `MVCCExportFingerprint` that
exports a fingerprint for point keys in the keyrange
[StartKey, EndKey) over the interval (StartTS, EndTS].
The export logic used by `MVCCExportFingerprint` is the same
that drives `MVCCExportToSST`. The former writes to a fingerprintWriter
while the latter writes to an sstWriter. Currently, this
method only support using an `fnv64` hasher to fingerprint each KV.

This change does not wire `MVCCExportFingerprint` to ExportRequest
command evaluation. This will be done as a followup.

Informs: #89336

Release note: None
  • Loading branch information
adityamaru committed Nov 9, 2022
1 parent 50ced42 commit 84cdd0b
Show file tree
Hide file tree
Showing 9 changed files with 1,212 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"doc.go",
"engine.go",
"engine_key.go",
"fingerprint_writer.go",
"in_mem.go",
"intent_interleaving_iter.go",
"intent_reader_writer.go",
Expand Down
146 changes: 146 additions & 0 deletions pkg/storage/fingerprint_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"
"hash"
"io"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/errors"
)

// fingerprintWriter hashes every key/timestamp and value for point keys, and
// combines their hashes via a XOR into a running aggregate.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
//
// The caller must Finish() and Close() the fingerprintWriter to finalize the
// writes to the underlying pebble SST.
type fingerprintWriter struct {
hasher hash.Hash64
timestampBuf []byte

sstWriter *SSTWriter
xorAgg *uintXorAggregate
}

// makeFingerprintWriter creates a new fingerprintWriter.
func makeFingerprintWriter(
ctx context.Context, hasher hash.Hash64, cs *cluster.Settings, f io.Writer,
) fingerprintWriter {
// TODO(adityamaru,dt): Once
// https://github.com/cockroachdb/cockroach/issues/90450 has been addressed we
// should write to a kvBuf instead of a Backup SST writer.
sstWriter := MakeBackupSSTWriter(ctx, cs, f)
return fingerprintWriter{
sstWriter: &sstWriter,
hasher: hasher,
xorAgg: &uintXorAggregate{},
}
}

type uintXorAggregate struct {
sum uint64
}

// add inserts one value into the running xor.
func (a *uintXorAggregate) add(x uint64) {
a.sum = a.sum ^ x
}

// result returns the xor.
func (a *uintXorAggregate) result() uint64 {
return a.sum
}

// Finish finalizes the underlying SSTWriter, and returns the aggregated
// fingerprint for point keys.
func (f *fingerprintWriter) Finish() (uint64, error) {
// If no records were added to the sstable, skip completing it.
if f.sstWriter.DataSize != 0 {
if err := f.sstWriter.Finish(); err != nil {
return 0, err
}
}
return f.xorAgg.result(), nil
}

// Close finishes and frees memory and other resources. Close is idempotent.
func (f *fingerprintWriter) Close() {
if f.sstWriter == nil {
return
}
f.sstWriter.Close()
f.hasher.Reset()
f.xorAgg = nil
f.sstWriter = nil
}

var _ ExportWriter = &fingerprintWriter{}

// PutRawMVCCRangeKey implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCCRangeKey(key MVCCRangeKey, bytes []byte) error {
// We do not fingerprint range keys, instead, we write them to a Pebble SST.
// This is because range keys do not have a stable, discrete identity and so
// it is up to the caller to define a deterministic fingerprinting scheme
// across all returned range keys.
return f.sstWriter.PutRawMVCCRangeKey(key, bytes)
}

// PutRawMVCC implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCC(key MVCCKey, value []byte) error {
defer f.hasher.Reset()

// Hash the key/timestamp and value of the RawMVCC.
if err := f.hash(key.Key); err != nil {
return err
}
f.timestampBuf = EncodeMVCCTimestampToBuf(f.timestampBuf, key.Timestamp)
if err := f.hash(f.timestampBuf); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}
f.xorAgg.add(f.hasher.Sum64())
return nil
}

// PutUnversioned implements the Writer interface.
func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error {
defer f.hasher.Reset()

// Hash the key and value in the absence of a timestamp.
if err := f.hash(key); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}

f.xorAgg.add(f.hasher.Sum64())
return nil
}

func (f *fingerprintWriter) hash(data []byte) error {
if _, err := f.hasher.Write(data); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
`"It never returns an error." -- https://golang.org/pkg/hash: %T`, f)
}

return nil
}
65 changes: 61 additions & 4 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"hash/fnv"
"io"
"math"
"runtime"
Expand Down Expand Up @@ -5766,6 +5767,35 @@ func MVCCIsSpanEmpty(
return !valid, nil
}

// MVCCExportFingerprint exports a fingerprint for point keys in the keyrange
// [StartKey, EndKey) over the interval (StartTS, EndTS]. Each key/timestamp and
// value is hashed using a fnv64 hasher, and combined into a running aggregate
// via a XOR. On completion of the export this aggregate is returned as the
// fingerprint.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
func MVCCExportFingerprint(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, uint64, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()

hasher := fnv.New64()
fingerprintWriter := makeFingerprintWriter(ctx, hasher, cs, dest)
defer fingerprintWriter.Close()

summary, resumeKey, err := mvccExportToWriter(ctx, reader, opts, &fingerprintWriter)
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, 0, err
}

fingerprint, err := fingerprintWriter.Finish()
return summary, resumeKey, fingerprint, err
}

// MVCCExportToSST exports changes to the keyrange [StartKey, EndKey) over the
// interval (StartTS, EndTS] as a Pebble SST. See mvccExportToWriter for more
// details.
Expand All @@ -5791,6 +5821,36 @@ func MVCCExportToSST(
return summary, resumeKey, sstWriter.Finish()
}

// ExportWriter is a trimmed down version of the Writer interface. It contains
// only those methods used during ExportRequest command evaluation.
type ExportWriter interface {
// PutRawMVCCRangeKey writes an MVCC range key with the provided encoded
// MVCCValue. It will replace any overlapping range keys at the given
// timestamp (even partial overlap). Only MVCC range tombstones, i.e. an empty
// value, are currently allowed (other kinds will need additional handling in
// MVCC APIs and elsewhere, e.g. stats and GC). It can be used to avoid
// decoding and immediately re-encoding an MVCCValue, but should generally be
// avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCCRangeKey
// returns.
PutRawMVCCRangeKey(MVCCRangeKey, []byte) error
// PutRawMVCC sets the given key to the encoded MVCCValue. It requires that
// the timestamp is non-empty (see {PutUnversioned,PutIntent} if the timestamp
// is empty). It can be used to avoid decoding and immediately re-encoding an
// MVCCValue, but should generally be avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCC
// returns.
PutRawMVCC(key MVCCKey, value []byte) error
// PutUnversioned sets the given key to the value provided. It is for use
// with inline metadata (not intents) and other unversioned keys (like
// Range-ID local keys).
//
// It is safe to modify the contents of the arguments after Put returns.
PutUnversioned(key roachpb.Key, value []byte) error
}

// mvccExportToWriter exports changes to the keyrange [StartKey, EndKey) over
// the interval (StartTS, EndTS] to the passed in writer. See MVCCExportOptions
// for options. StartTS may be zero.
Expand All @@ -5817,11 +5877,8 @@ func MVCCExportToSST(
// error is returned then the writer's contents are undefined. It is the
// responsibility of the caller to Finish() / Close() the passed in writer.
func mvccExportToWriter(
ctx context.Context, reader Reader, opts MVCCExportOptions, writer Writer,
ctx context.Context, reader Reader, opts MVCCExportOptions, writer ExportWriter,
) (roachpb.BulkOpSummary, MVCCKey, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.mvccExportToWriter")
defer span.Finish()

// If we're not exporting all revisions then we can mask point keys below any
// MVCC range tombstones, since we don't care about them.
var rangeKeyMasking hlc.Timestamp
Expand Down
32 changes: 27 additions & 5 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var (
// put_rangekey ts=<int>[,<int>] [localTs=<int>[,<int>]] k=<key> end=<key>
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]]
// scan [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [end=<key>] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]] [max=<max>] [targetbytes=<target>] [allowEmpty]
// export [k=<key>] [end=<key>] [ts=<int>[,<int>]] [kTs=<int>[,<int>]] [startTs=<int>[,<int>]] [maxIntents=<int>] [allRevisions] [targetSize=<int>] [maxSize=<int>] [stopMidKey]
// export [k=<key>] [end=<key>] [ts=<int>[,<int>]] [kTs=<int>[,<int>]] [startTs=<int>[,<int>]] [maxIntents=<int>] [allRevisions] [targetSize=<int>] [maxSize=<int>] [stopMidKey] [fingerprint]
//
// iter_new [k=<key>] [end=<key>] [prefix] [kind=key|keyAndIntents] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [pointSynthesis] [maskBelow=<int>[,<int>]]
// iter_new_incremental [k=<key>] [end=<key>] [startTs=<int>[,<int>]] [endTs=<int>[,<int>]] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [maskBelow=<int>[,<int>]] [intents=error|aggregate|emit]
Expand Down Expand Up @@ -1327,22 +1327,44 @@ func cmdExport(e *evalCtx) error {
if e.hasArg("maxSize") {
e.scanArg("maxSize", &opts.MaxSize)
}
var shouldFingerprint bool
if e.hasArg("fingerprint") {
shouldFingerprint = true
}

r := e.newReader()
defer r.Close()

sstFile := &storage.MemFile{}
summary, resume, err := storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err

var summary roachpb.BulkOpSummary
var resume storage.MVCCKey
var fingerprint uint64
var err error
if shouldFingerprint {
summary, resume, fingerprint, err = storage.MVCCExportFingerprint(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err
}
e.results.buf.Printf("export: %s", &summary)
e.results.buf.Print(" fingerprint=true")
} else {
summary, resume, err = storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err
}
e.results.buf.Printf("export: %s", &summary)
}

e.results.buf.Printf("export: %s", &summary)
if resume.Key != nil {
e.results.buf.Printf(" resume=%s", resume)
}
e.results.buf.Printf("\n")

if shouldFingerprint {
e.results.buf.Printf("fingerprint: %d\n", fingerprint)
}

iter, err := storage.NewMemSSTIterator(sstFile.Bytes(), false /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: keys.MaxKey,
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/mvcc_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,23 @@ func encodeMVCCTimestampSuffixToBuf(buf []byte, ts hlc.Timestamp) []byte {
return buf
}

// EncodeMVCCTimestampToBuf encodes an MVCC timestamp into its Pebble
// representation, excluding the length suffix and sentinel byte, reusing the
// given byte slice if it has sufficient capacity.
func EncodeMVCCTimestampToBuf(buf []byte, ts hlc.Timestamp) []byte {
tsLen := encodedMVCCTimestampLength(ts)
if tsLen == 0 {
return buf[:0]
}
if cap(buf) < tsLen {
buf = make([]byte, tsLen)
} else {
buf = buf[:tsLen]
}
encodeMVCCTimestampToBuf(buf, ts)
return buf
}

// encodeMVCCTimestampToBuf encodes an MVCC timestamp into its Pebble
// representation, excluding the length suffix and sentinel byte. The target
// buffer must have the correct size, and the timestamp must not be empty.
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/mvcc_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ func TestEncodeDecodeMVCCKeyAndTimestampWithLength(t *testing.T) {
"logical and synthetic": {"foo", hlc.Timestamp{Logical: 65535, Synthetic: true}, "666f6f0000000000000000000000ffff010e"},
"all": {"foo", hlc.Timestamp{WallTime: 1643550788737652545, Logical: 65535, Synthetic: true}, "666f6f0016cf10bc050557410000ffff010e"},
}

buf := []byte{}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {

Expand Down Expand Up @@ -234,6 +236,13 @@ func TestEncodeDecodeMVCCKeyAndTimestampWithLength(t *testing.T) {
decodedTS, err = decodeMVCCTimestamp(encodedTS)
require.NoError(t, err)
require.Equal(t, tc.ts, decodedTS)

buf = EncodeMVCCTimestampToBuf(buf, tc.ts)
if expectTS == nil {
require.Empty(t, buf)
} else {
require.Equal(t, expectTS, buf)
}
})
}
}
Expand Down
Loading

0 comments on commit 84cdd0b

Please sign in to comment.