Skip to content

Commit

Permalink
storage: add MVCCExportFingerprint method and fingerprintWriter
Browse files Browse the repository at this point in the history
This change introduces a fingerprintWriter that
hashes every key/timestamp and value for point keys, and
combines their hashes via a XOR into a running aggregate.
Range keys are not fingerprinted but instead written to a pebble SST that is
returned to the caller. This is because range keys do not have a stable,
discrete identity and so it is up to the caller to define a deterministic
fingerprinting scheme across all returned range keys.

The fingerprintWriter is used by `MVCCExportFingerprint` that
exports a fingerprint for point keys in the keyrange
[StartKey, EndKey) over the interval (StartTS, EndTS].
The export logic used by `MVCCExportFingerprint` is the same
that drives `MVCCExportToSST`. The former writes to a fingerprintWriter
while the latter writes to an sstWriter. Currently, this
method only support using an `fnv64` hasher to fingerprint each KV.

This change does not wire `MVCCExportFingerprint` to ExportRequest
command evaluation. This will be done as a followup.

Informs: #89336

Release note: None
  • Loading branch information
adityamaru committed Nov 7, 2022
1 parent 50ced42 commit 7e56760
Show file tree
Hide file tree
Showing 7 changed files with 1,218 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"doc.go",
"engine.go",
"engine_key.go",
"fingerprint_writer.go",
"in_mem.go",
"intent_interleaving_iter.go",
"intent_reader_writer.go",
Expand Down
155 changes: 155 additions & 0 deletions pkg/storage/fingerprint_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"
"hash"
"io"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/errors"
)

// fingerprintWriter hashes every key/timestamp and value for point keys, and
// combines their hashes via a XOR into a running aggregate.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
//
// The caller must Finish() and Close() the fingerprintWriter to finalize the
// writes to the underlying pebble SST.
type fingerprintWriter struct {
hasher hash.Hash64
timestampBuf []byte

sstWriter *SSTWriter
xorAgg *uintXorAggregate
}

// makeFingerprintWriter creates a new fingerprintWriter.
func makeFingerprintWriter(
ctx context.Context, hasher hash.Hash64, cs *cluster.Settings, f io.Writer,
) fingerprintWriter {
// TODO(adityamaru,dt): Once
// https://github.com/cockroachdb/cockroach/issues/90450 has been addressed we
// should write to a kvBuf instead of a Backup SST writer.
sstWriter := MakeBackupSSTWriter(ctx, cs, f)
return fingerprintWriter{
sstWriter: &sstWriter,
hasher: hasher,
xorAgg: &uintXorAggregate{},
}
}

type uintXorAggregate struct {
sum uint64
}

// add inserts one value into the running xor.
func (a *uintXorAggregate) add(x uint64) {
a.sum = a.sum ^ x
}

// result returns the xor.
func (a *uintXorAggregate) result() uint64 {
return a.sum
}

// Finish finalizes the underlying SSTWriter, and returns the aggregated
// fingerprint for point keys.
func (f *fingerprintWriter) Finish() (uint64, error) {
// If no records were added to the sstable, skip completing it.
if f.sstWriter.DataSize != 0 {
if err := f.sstWriter.Finish(); err != nil {
return 0, err
}
}
return f.xorAgg.result(), nil
}

// Close finishes and frees memory and other resources. Close is idempotent.
func (f *fingerprintWriter) Close() {
if f.sstWriter == nil {
return
}
f.sstWriter.Close()
f.hasher.Reset()
f.xorAgg = nil
f.sstWriter = nil
}

var _ ExportWriter = &fingerprintWriter{}

// PutRawMVCCRangeKey implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCCRangeKey(key MVCCRangeKey, bytes []byte) error {
// We do not fingerprint range keys, instead, we write them to a Pebble SST.
// This is because range keys do not have a stable, discrete identity and so
// it is up to the caller to define a deterministic fingerprinting scheme
// across all returned range keys.
return f.sstWriter.PutRawMVCCRangeKey(key, bytes)
}

// PutRawMVCC implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCC(key MVCCKey, value []byte) error {
defer f.hasher.Reset()

// Hash the key/timestamp and value of the RawMVCC.
if err := f.hash(key.Key); err != nil {
return err
}
tsLen := encodedMVCCTimestampLength(key.Timestamp)
if tsLen == 0 {
return errors.AssertionFailedf("PutRawMVCC should not be called with an empty timestamp")
}
if cap(f.timestampBuf) < tsLen {
f.timestampBuf = make([]byte, tsLen)
} else {
f.timestampBuf = f.timestampBuf[:tsLen]
}
encodeMVCCTimestampToBuf(f.timestampBuf, key.Timestamp)
if err := f.hash(f.timestampBuf); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}
f.xorAgg.add(f.hasher.Sum64())
return nil
}

// PutUnversioned implements the Writer interface.
func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error {
defer f.hasher.Reset()

// Hash the key and value in the absence of a timestamp.
if err := f.hash(key); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}

f.xorAgg.add(f.hasher.Sum64())
return nil
}

func (f *fingerprintWriter) hash(data []byte) error {
if _, err := f.hasher.Write(data); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
`"It never returns an error." -- https://golang.org/pkg/hash: %T`, f)
}

return nil
}
65 changes: 61 additions & 4 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"hash/fnv"
"io"
"math"
"runtime"
Expand Down Expand Up @@ -5766,6 +5767,35 @@ func MVCCIsSpanEmpty(
return !valid, nil
}

// MVCCExportFingerprint exports a fingerprint for point keys in the keyrange
// [StartKey, EndKey) over the interval (StartTS, EndTS]. Each key/timestamp and
// value is hashed using a fnv64 hasher, and combined into a running aggregate
// via a XOR. On completion of the export this aggregate is returned as the
// fingerprint.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
func MVCCExportFingerprint(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, uint64, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()

hasher := fnv.New64()
fingerprintWriter := makeFingerprintWriter(ctx, hasher, cs, dest)
defer fingerprintWriter.Close()

summary, resumeKey, err := mvccExportToWriter(ctx, reader, opts, &fingerprintWriter)
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, 0, err
}

fingerprint, err := fingerprintWriter.Finish()
return summary, resumeKey, fingerprint, err
}

// MVCCExportToSST exports changes to the keyrange [StartKey, EndKey) over the
// interval (StartTS, EndTS] as a Pebble SST. See mvccExportToWriter for more
// details.
Expand All @@ -5791,6 +5821,36 @@ func MVCCExportToSST(
return summary, resumeKey, sstWriter.Finish()
}

// ExportWriter is a trimmed down version of the Writer interface. It contains
// only those methods used during ExportRequest command evaluation.
type ExportWriter interface {
// PutRawMVCCRangeKey writes an MVCC range key with the provided encoded
// MVCCValue. It will replace any overlapping range keys at the given
// timestamp (even partial overlap). Only MVCC range tombstones, i.e. an empty
// value, are currently allowed (other kinds will need additional handling in
// MVCC APIs and elsewhere, e.g. stats and GC). It can be used to avoid
// decoding and immediately re-encoding an MVCCValue, but should generally be
// avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCCRangeKey
// returns.
PutRawMVCCRangeKey(MVCCRangeKey, []byte) error
// PutRawMVCC sets the given key to the encoded MVCCValue. It requires that
// the timestamp is non-empty (see {PutUnversioned,PutIntent} if the timestamp
// is empty). It can be used to avoid decoding and immediately re-encoding an
// MVCCValue, but should generally be avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCC
// returns.
PutRawMVCC(key MVCCKey, value []byte) error
// PutUnversioned sets the given key to the value provided. It is for use
// with inline metadata (not intents) and other unversioned keys (like
// Range-ID local keys).
//
// It is safe to modify the contents of the arguments after Put returns.
PutUnversioned(key roachpb.Key, value []byte) error
}

// mvccExportToWriter exports changes to the keyrange [StartKey, EndKey) over
// the interval (StartTS, EndTS] to the passed in writer. See MVCCExportOptions
// for options. StartTS may be zero.
Expand All @@ -5817,11 +5877,8 @@ func MVCCExportToSST(
// error is returned then the writer's contents are undefined. It is the
// responsibility of the caller to Finish() / Close() the passed in writer.
func mvccExportToWriter(
ctx context.Context, reader Reader, opts MVCCExportOptions, writer Writer,
ctx context.Context, reader Reader, opts MVCCExportOptions, writer ExportWriter,
) (roachpb.BulkOpSummary, MVCCKey, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.mvccExportToWriter")
defer span.Finish()

// If we're not exporting all revisions then we can mask point keys below any
// MVCC range tombstones, since we don't care about them.
var rangeKeyMasking hlc.Timestamp
Expand Down
73 changes: 50 additions & 23 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,25 +689,26 @@ var commands = map[string]cmd{
"check_intent": {typReadOnly, cmdCheckIntent},
"add_lock": {typLocksUpdate, cmdAddLock},

"clear": {typDataUpdate, cmdClear},
"clear_range": {typDataUpdate, cmdClearRange},
"clear_rangekey": {typDataUpdate, cmdClearRangeKey},
"clear_time_range": {typDataUpdate, cmdClearTimeRange},
"cput": {typDataUpdate, cmdCPut},
"del": {typDataUpdate, cmdDelete},
"del_range": {typDataUpdate, cmdDeleteRange},
"del_range_ts": {typDataUpdate, cmdDeleteRangeTombstone},
"del_range_pred": {typDataUpdate, cmdDeleteRangePredicate},
"export": {typReadOnly, cmdExport},
"get": {typReadOnly, cmdGet},
"gc_clear_range": {typDataUpdate, cmdGCClearRange},
"increment": {typDataUpdate, cmdIncrement},
"initput": {typDataUpdate, cmdInitPut},
"merge": {typDataUpdate, cmdMerge},
"put": {typDataUpdate, cmdPut},
"put_rangekey": {typDataUpdate, cmdPutRangeKey},
"scan": {typReadOnly, cmdScan},
"is_span_empty": {typReadOnly, cmdIsSpanEmpty},
"clear": {typDataUpdate, cmdClear},
"clear_range": {typDataUpdate, cmdClearRange},
"clear_rangekey": {typDataUpdate, cmdClearRangeKey},
"clear_time_range": {typDataUpdate, cmdClearTimeRange},
"cput": {typDataUpdate, cmdCPut},
"del": {typDataUpdate, cmdDelete},
"del_range": {typDataUpdate, cmdDeleteRange},
"del_range_ts": {typDataUpdate, cmdDeleteRangeTombstone},
"del_range_pred": {typDataUpdate, cmdDeleteRangePredicate},
"export": {typReadOnly, cmdExport},
"export_fingerprint": {typReadOnly, cmdExportFingerprint},
"get": {typReadOnly, cmdGet},
"gc_clear_range": {typDataUpdate, cmdGCClearRange},
"increment": {typDataUpdate, cmdIncrement},
"initput": {typDataUpdate, cmdInitPut},
"merge": {typDataUpdate, cmdMerge},
"put": {typDataUpdate, cmdPut},
"put_rangekey": {typDataUpdate, cmdPutRangeKey},
"scan": {typReadOnly, cmdScan},
"is_span_empty": {typReadOnly, cmdIsSpanEmpty},

"iter_new": {typReadOnly, cmdIterNew},
"iter_new_incremental": {typReadOnly, cmdIterNewIncremental}, // MVCCIncrementalIterator
Expand Down Expand Up @@ -1308,6 +1309,11 @@ func cmdIsSpanEmpty(e *evalCtx) error {
})
}

func cmdExportFingerprint(e *evalCtx) error {
e.exportFingerprint = true
return cmdExport(e)
}

func cmdExport(e *evalCtx) error {
key, endKey := e.getKeyRange()
opts := storage.MVCCExportOptions{
Expand All @@ -1332,17 +1338,37 @@ func cmdExport(e *evalCtx) error {
defer r.Close()

sstFile := &storage.MemFile{}
summary, resume, err := storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err

var summary roachpb.BulkOpSummary
var resume storage.MVCCKey
var fingerprint uint64
var err error
if e.exportFingerprint {
summary, resume, fingerprint, err = storage.MVCCExportFingerprint(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err
}
e.results.buf.Printf("export_fingerprint: %s", &summary)
} else {
summary, resume, err = storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err
}
e.results.buf.Printf("export: %s", &summary)
}

e.results.buf.Printf("export: %s", &summary)
if resume.Key != nil {
e.results.buf.Printf(" resume=%s", resume)
}
e.results.buf.Printf("\n")

if e.exportFingerprint {
oracle := storage.MakeFingerprintOracle(e.st, e.engine, opts)
expectedFingerprint, _ := oracle.GetFingerprintAndRangeKeys(e.ctx, e.t)
require.Equal(e.t, expectedFingerprint, fingerprint)
e.results.buf.Printf("pointkeys_fingerprint_verified\n")
}

iter, err := storage.NewMemSSTIterator(sstFile.Bytes(), false /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: keys.MaxKey,
Expand Down Expand Up @@ -1967,6 +1993,7 @@ type evalCtx struct {
sstWriter *storage.SSTWriter
sstFile *storage.MemFile
ssts [][]byte
exportFingerprint bool
}

func newEvalCtx(ctx context.Context, engine storage.Engine) *evalCtx {
Expand Down
Loading

0 comments on commit 7e56760

Please sign in to comment.