Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add MVCCExportFingerprint method and fingerprintWriter #90848

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
)

// SSTTargetSizeSetting is the cluster setting name for the
Expand Down Expand Up @@ -105,14 +104,6 @@ func evalExport(
ctx, evalExportSpan := tracing.ChildSpan(ctx, "evalExport")
defer evalExportSpan.Finish()

var evalExportTrace types.StringValue
if cArgs.EvalCtx.NodeID() == h.GatewayNodeID {
evalExportTrace.Value = fmt.Sprintf("evaluating Export on gateway node %d", cArgs.EvalCtx.NodeID())
} else {
evalExportTrace.Value = fmt.Sprintf("evaluating Export on remote node %d", cArgs.EvalCtx.NodeID())
}
evalExportSpan.RecordStructured(&evalExportTrace)

// Table's marked to be excluded from backup are expected to be configured
// with a short GC TTL. Additionally, backup excludes such table's from being
// protected from GC when writing ProtectedTimestamp records. The
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"doc.go",
"engine.go",
"engine_key.go",
"fingerprint_writer.go",
"in_mem.go",
"intent_interleaving_iter.go",
"intent_reader_writer.go",
Expand Down
146 changes: 146 additions & 0 deletions pkg/storage/fingerprint_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"
"hash"
"io"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/errors"
)

// fingerprintWriter hashes every key/timestamp and value for point keys, and
// combines their hashes via a XOR into a running aggregate.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
//
// The caller must Finish() and Close() the fingerprintWriter to finalize the
// writes to the underlying pebble SST.
type fingerprintWriter struct {
hasher hash.Hash64
timestampBuf []byte

sstWriter *SSTWriter
xorAgg *uintXorAggregate
}

// makeFingerprintWriter creates a new fingerprintWriter.
func makeFingerprintWriter(
ctx context.Context, hasher hash.Hash64, cs *cluster.Settings, f io.Writer,
) fingerprintWriter {
// TODO(adityamaru,dt): Once
// https://github.com/cockroachdb/cockroach/issues/90450 has been addressed we
// should write to a kvBuf instead of a Backup SST writer.
sstWriter := MakeBackupSSTWriter(ctx, cs, f)
return fingerprintWriter{
sstWriter: &sstWriter,
hasher: hasher,
xorAgg: &uintXorAggregate{},
}
}

type uintXorAggregate struct {
sum uint64
}

// add inserts one value into the running xor.
func (a *uintXorAggregate) add(x uint64) {
a.sum = a.sum ^ x
}

// result returns the xor.
func (a *uintXorAggregate) result() uint64 {
return a.sum
}

// Finish finalizes the underlying SSTWriter, and returns the aggregated
// fingerprint for point keys.
func (f *fingerprintWriter) Finish() (uint64, error) {
// If no records were added to the sstable, skip completing it.
if f.sstWriter.DataSize != 0 {
if err := f.sstWriter.Finish(); err != nil {
return 0, err
}
}
return f.xorAgg.result(), nil
}

// Close finishes and frees memory and other resources. Close is idempotent.
func (f *fingerprintWriter) Close() {
if f.sstWriter == nil {
return
}
f.sstWriter.Close()
f.hasher.Reset()
f.xorAgg = nil
f.sstWriter = nil
}

var _ ExportWriter = &fingerprintWriter{}

// PutRawMVCCRangeKey implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCCRangeKey(key MVCCRangeKey, bytes []byte) error {
// We do not fingerprint range keys, instead, we write them to a Pebble SST.
// This is because range keys do not have a stable, discrete identity and so
// it is up to the caller to define a deterministic fingerprinting scheme
// across all returned range keys.
return f.sstWriter.PutRawMVCCRangeKey(key, bytes)
}

// PutRawMVCC implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCC(key MVCCKey, value []byte) error {
defer f.hasher.Reset()

// Hash the key/timestamp and value of the RawMVCC.
if err := f.hash(key.Key); err != nil {
return err
}
f.timestampBuf = EncodeMVCCTimestampToBuf(f.timestampBuf, key.Timestamp)
if err := f.hash(f.timestampBuf); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}
f.xorAgg.add(f.hasher.Sum64())
return nil
}

// PutUnversioned implements the Writer interface.
func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error {
defer f.hasher.Reset()

// Hash the key and value in the absence of a timestamp.
if err := f.hash(key); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}

f.xorAgg.add(f.hasher.Sum64())
return nil
}

func (f *fingerprintWriter) hash(data []byte) error {
if _, err := f.hasher.Write(data); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
`"It never returns an error." -- https://golang.org/pkg/hash: %T`, f)
}

return nil
}
122 changes: 96 additions & 26 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"hash/fnv"
"io"
"math"
"runtime"
Expand Down Expand Up @@ -5766,9 +5767,93 @@ func MVCCIsSpanEmpty(
return !valid, nil
}

// MVCCExportFingerprint exports a fingerprint for point keys in the keyrange
// [StartKey, EndKey) over the interval (StartTS, EndTS]. Each key/timestamp and
// value is hashed using a fnv64 hasher, and combined into a running aggregate
// via a XOR. On completion of the export this aggregate is returned as the
// fingerprint.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
func MVCCExportFingerprint(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, uint64, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()

hasher := fnv.New64()
fingerprintWriter := makeFingerprintWriter(ctx, hasher, cs, dest)
defer fingerprintWriter.Close()

summary, resumeKey, err := mvccExportToWriter(ctx, reader, opts, &fingerprintWriter)
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, 0, err
}

fingerprint, err := fingerprintWriter.Finish()
return summary, resumeKey, fingerprint, err
}

// MVCCExportToSST exports changes to the keyrange [StartKey, EndKey) over the
// interval (StartTS, EndTS] as a Pebble SST. See MVCCExportOptions for options.
// StartTS may be zero.
// interval (StartTS, EndTS] as a Pebble SST. See mvccExportToWriter for more
// details.
func MVCCExportToSST(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()
sstWriter := MakeBackupSSTWriter(ctx, cs, dest)
defer sstWriter.Close()

summary, resumeKey, err := mvccExportToWriter(ctx, reader, opts, &sstWriter)
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}

if summary.DataSize == 0 {
// If no records were added to the sstable, skip completing it and return a
// nil slice – the export code will discard it anyway (based on 0 DataSize).
return roachpb.BulkOpSummary{}, MVCCKey{}, nil
}

return summary, resumeKey, sstWriter.Finish()
}

// ExportWriter is a trimmed down version of the Writer interface. It contains
// only those methods used during ExportRequest command evaluation.
type ExportWriter interface {
// PutRawMVCCRangeKey writes an MVCC range key with the provided encoded
// MVCCValue. It will replace any overlapping range keys at the given
// timestamp (even partial overlap). Only MVCC range tombstones, i.e. an empty
// value, are currently allowed (other kinds will need additional handling in
// MVCC APIs and elsewhere, e.g. stats and GC). It can be used to avoid
// decoding and immediately re-encoding an MVCCValue, but should generally be
// avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCCRangeKey
// returns.
PutRawMVCCRangeKey(MVCCRangeKey, []byte) error
// PutRawMVCC sets the given key to the encoded MVCCValue. It requires that
// the timestamp is non-empty (see {PutUnversioned,PutIntent} if the timestamp
// is empty). It can be used to avoid decoding and immediately re-encoding an
// MVCCValue, but should generally be avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCC
// returns.
PutRawMVCC(key MVCCKey, value []byte) error
// PutUnversioned sets the given key to the value provided. It is for use
// with inline metadata (not intents) and other unversioned keys (like
// Range-ID local keys).
//
// It is safe to modify the contents of the arguments after Put returns.
PutUnversioned(key roachpb.Key, value []byte) error
}

// mvccExportToWriter exports changes to the keyrange [StartKey, EndKey) over
// the interval (StartTS, EndTS] to the passed in writer. See MVCCExportOptions
// for options. StartTS may be zero.
//
// This comes in two principal flavors: all revisions or latest revision only.
// In all-revisions mode, exports everything matching the span and time bounds,
Expand All @@ -5788,17 +5873,12 @@ func MVCCIsSpanEmpty(
// intents outside are ignored.
//
// Returns an export summary and a resume key that allows resuming the export if
// it reached a limit. Data is written to dest as it is collected. If an error
// is returned then dest contents are undefined.
func MVCCExportToSST(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
// it reached a limit. Data is written to the writer as it is collected. If an
// error is returned then the writer's contents are undefined. It is the
// responsibility of the caller to Finish() / Close() the passed in writer.
func mvccExportToWriter(
ctx context.Context, reader Reader, opts MVCCExportOptions, writer ExportWriter,
) (roachpb.BulkOpSummary, MVCCKey, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()
sstWriter := MakeBackupSSTWriter(ctx, cs, dest)
defer sstWriter.Close()

// If we're not exporting all revisions then we can mask point keys below any
// MVCC range tombstones, since we don't care about them.
var rangeKeyMasking hlc.Timestamp
Expand Down Expand Up @@ -5936,7 +6016,7 @@ func MVCCExportToSST(
}
// Export only the inner roachpb.Value, not the MVCCValue header.
rawValue := mvccValue.Value.RawBytes
if err := sstWriter.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil {
if err := writer.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
}
Expand Down Expand Up @@ -6047,11 +6127,11 @@ func MVCCExportToSST(
if unsafeKey.Timestamp.IsEmpty() {
// This should never be an intent since the incremental iterator returns
// an error when encountering intents.
if err := sstWriter.PutUnversioned(unsafeKey.Key, unsafeValue); err != nil {
if err := writer.PutUnversioned(unsafeKey.Key, unsafeValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "adding key %s", unsafeKey)
}
} else {
if err := sstWriter.PutRawMVCC(unsafeKey, unsafeValue); err != nil {
if err := writer.PutRawMVCC(unsafeKey, unsafeValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "adding key %s", unsafeKey)
}
}
Expand Down Expand Up @@ -6109,23 +6189,13 @@ func MVCCExportToSST(
}
// Export only the inner roachpb.Value, not the MVCCValue header.
rawValue := mvccValue.Value.RawBytes
if err := sstWriter.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil {
if err := writer.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
}
rows.BulkOpSummary.DataSize += rangeKeysSize
}

if rows.BulkOpSummary.DataSize == 0 {
// If no records were added to the sstable, skip completing it and return a
// nil slice – the export code will discard it anyway (based on 0 DataSize).
return roachpb.BulkOpSummary{}, MVCCKey{}, nil
}

if err := sstWriter.Finish(); err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}

return rows.BulkOpSummary, resumeKey, nil
}

Expand Down
Loading