Skip to content

Commit

Permalink
Merge #91124
Browse files Browse the repository at this point in the history
91124: builtins: add crdb_internal.fingerprint builtin r=adityamaru a=adityamaru

This change adds a `crdb_internal.fingerprint` builtin
that accepts a `startTime`, `endTime`, `startKey` and `endKey`
to define the interval the user wants to fingerprint. The builtin
is powered by sending an ExportRequest with the defined intervals
but with the `ExportFingerprint` option set to true.

Setting this option on the ExportRequest means that instead of
writing all point and rangekeys to an SST and sending them back to
the client, command evaluation will use the newly introduced
`fingerprintWriter` (#90848) when exporting keys. This writer
computes an `fnv64` hash of the key/timestamp, value for each point key
and maintains a running XOR aggregate of all the point keys processed
as part of the ExportRequest. Rangekeys are not fingerprinted during
command evaluation, but instead returned to the client in a
pebble SST. This is because range keys do not have a stable,
discrete identity and so it is up to the caller to define a deterministic
ingerprinting scheme across all returned range keys.

The ExportRequest sent as part of this builtin does not set any DistSender
limit, thereby allowing concurrent execution across ranges. We are not
concerned about the ExportResponses growing too large since the SSTs
will only contain rangekeys that should be few in number. If this assumption
is proved incorrect in the future, we can revisit setting a `TargetBytes`
to the header of the BatchRequest.

Fixes: #89336

Release note: None

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Nov 24, 2022
2 parents 3cfd872 + 1acfcc8 commit b5be006
Show file tree
Hide file tree
Showing 12 changed files with 633 additions and 174 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3056,6 +3056,8 @@ may increase either contention or retry errors, or both.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.encode_key"></a><code>crdb_internal.encode_key(table_id: <a href="int.html">int</a>, index_id: <a href="int.html">int</a>, row_tuple: anyelement) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>Generate the key for a row on a particular table and index.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.fingerprint"></a><code>crdb_internal.fingerprint(span: <a href="bytes.html">bytes</a>[], start_time: <a href="timestamp.html">timestamptz</a>, all_revisions: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.force_assertion_error"></a><code>crdb_internal.force_assertion_error(msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.force_error"></a><code>crdb_internal.force_error(errorCode: <a href="string.html">string</a>, msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
Expand Down
50 changes: 33 additions & 17 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,34 @@ func evalExport(
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
summary, resume, err := storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader,
storage.MVCCExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS},
EndKey: args.EndKey,
StartTS: args.StartTime,
EndTS: h.Timestamp,
ExportAllRevisions: exportAllRevisions,
TargetSize: targetSize,
MaxSize: maxSize,
MaxIntents: maxIntents,
StopMidKey: args.SplitMidKey,
ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}),
}, destFile)
opts := storage.MVCCExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS},
EndKey: args.EndKey,
StartTS: args.StartTime,
EndTS: h.Timestamp,
ExportAllRevisions: exportAllRevisions,
TargetSize: targetSize,
MaxSize: maxSize,
MaxIntents: maxIntents,
StopMidKey: args.SplitMidKey,
ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}),
}
var summary roachpb.BulkOpSummary
var resume storage.MVCCKey
var fingerprint uint64
var err error
if args.ExportFingerprint {
// Default to stripping the tenant prefix from keys, and checksum from
// values before fingerprinting so that the fingerprint is tenant
// agnostic.
opts.FingerprintOptions = storage.MVCCExportFingerprintOptions{
StripTenantPrefix: true,
StripValueChecksum: true,
}
summary, resume, fingerprint, err = storage.MVCCExportFingerprint(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile)
} else {
summary, resume, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile)
}
if err != nil {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
err = errors.WithHintf(err,
Expand All @@ -217,10 +232,11 @@ func evalExport(
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
EndKeyTS: resume.Timestamp,
Exported: summary,
SST: data,
Span: span,
EndKeyTS: resume.Timestamp,
Exported: summary,
SST: data,
Fingerprint: fingerprint,
}
reply.Files = append(reply.Files, exported)
start = resume.Key
Expand Down
34 changes: 31 additions & 3 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,25 @@ message ExportRequest {
// then there is no limit.
int64 target_file_size = 10;

// ExportFingerprint when set to true will result in ExportRequest command
// evaluation generating an fnv64 hash for every key/timestamp and value, for
// point keys encountered in the key/time interval. Each KV hash will be
// combined via a XOR into a running aggregate that is returned as part of the
// ExportResponse.
//
// Range keys are not fingerprinted but instead written to a pebble SST that
// is returned to the caller. This is because range keys do not have a stable,
// discrete identity. A range key can span multiple ranges, or it may be
// fragmented by range keys outside of the time bounds which are not relevant
// to the fingerprint. So, we could need multiple export requests to piece
// together the entire rangekey before fingerprinting it. It is up to the
// caller to define a deterministic fingerprinting scheme across all returned
// range keys.
bool export_fingerprint = 14;

reserved 2, 5, 7, 8, 11;

// Next ID: 15
}

// BulkOpSummary summarizes the data processed by an operation, counting the
Expand Down Expand Up @@ -1586,14 +1604,24 @@ message ExportResponse {
(gogoproto.customname) = "EndKeyTS"
];
string path = 2;
reserved 3;
reserved 4;
reserved 5;

BulkOpSummary exported = 6 [(gogoproto.nullable) = false];

bytes sst = 7 [(gogoproto.customname) = "SST"];
string locality_kv = 8 [(gogoproto.customname) = "LocalityKV"];

// Fingerprint is the XOR aggregate of the fnv64 hash of every point
// key/timestamp and corresponding value that has been exported as part of
// the ExportRequest. This field is only set when the request is sent with
// `ExportFingerprint` set to true.
//
// Range keys are not fingerprinted but instead written to the sst above
// that is returned to the caller. This is because range keys do not have a
// stable, discrete identity and so it is up to the caller to define a
// deterministic fingerprinting scheme across all returned range keys.
uint64 fingerprint = 10;

reserved 3, 4, 5;
}

ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/repstream/streampb",
"//pkg/roachpb",
Expand Down Expand Up @@ -88,7 +89,9 @@ go_library(
"//pkg/sql/storageparam",
"//pkg/sql/storageparam/indexstorageparam",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/arith",
"//pkg/util/bitarray",
"//pkg/util/contextutil",
Expand Down Expand Up @@ -142,6 +145,7 @@ go_test(
"all_builtins_test.go",
"builtins_test.go",
"datums_to_bytes_builtin_test.go",
"fingerprint_builtin_test.go",
"generator_builtins_test.go",
"geo_builtins_test.go",
"help_test.go",
Expand All @@ -158,6 +162,8 @@ go_test(
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
Expand All @@ -173,16 +179,21 @@ go_test(
"//pkg/sql/sem/tree/treewindow",
"//pkg/sql/sem/volatility",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/duration",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_lib_pq//:pq",
"@com_github_stretchr_testify//assert",
Expand Down
166 changes: 166 additions & 0 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/password"
Expand Down Expand Up @@ -72,11 +73,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/fuzzystrmatch"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/ipaddr"
"github.com/cockroachdb/cockroach/pkg/util/json"
Expand Down Expand Up @@ -7327,6 +7332,167 @@ expires until the statement bundle is collected`,
Volatility: volatility.Volatile,
},
),
"crdb_internal.fingerprint": makeBuiltin(
tree.FunctionProperties{
Category: builtinconstants.CategorySystemInfo,
},
tree.Overload{
Types: tree.ArgTypes{
{"span", types.BytesArray},
{"start_time", types.TimestampTZ},
{"all_revisions", types.Bool},
// NB: The function can be called with an AOST clause that will be used
// as the `end_time` when issuing the ExportRequests for the purposes of
// fingerprinting.
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
ctx, sp := tracing.ChildSpan(ctx, "crdb_internal.fingerprint")
defer sp.Finish()

if !evalCtx.Settings.Version.IsActive(ctx, clusterversion.V23_1) {
return nil, errors.Errorf("cannot use crdb_internal.fingerprint until the cluster version is at least %s",
clusterversion.V23_1.String())
}

isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, errors.New("crdb_internal.fingerprint() requires admin privilege")
}
arr := tree.MustBeDArray(args[0])
if arr.Len() != 2 {
return nil, errors.New("expected an array of two elements")
}
startKey := []byte(tree.MustBeDBytes(arr.Array[0]))
endKey := []byte(tree.MustBeDBytes(arr.Array[1]))
endTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
if evalCtx.AsOfSystemTime != nil {
endTime = evalCtx.AsOfSystemTime.Timestamp
}
header := roachpb.Header{
Timestamp: endTime,
// We set WaitPolicy to Error, so that the export will return an error
// to us instead of a blocking wait if it hits any other txns.
//
// TODO(adityamaru): We might need to handle WriteIntentErrors
// specially in the future so as to allow the fingerprint to complete
// in the face of intents.
WaitPolicy: lock.WaitPolicy_Error,
}
startTime := args[1].(*tree.DTimestampTZ).Time
startTimestamp := hlc.Timestamp{WallTime: startTime.UnixNano()}
allRevisions := *args[2].(*tree.DBool)
filter := roachpb.MVCCFilter_Latest
if allRevisions {
filter = roachpb.MVCCFilter_All
}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeader{Key: startKey, EndKey: endKey},
StartTime: startTimestamp,
MVCCFilter: filter,
ExportFingerprint: true,
}
admissionHeader := roachpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
todo := make(chan *roachpb.ExportRequest, 1)
todo <- req
ctxDone := ctx.Done()
var fingerprint uint64
// TODO(adityamaru): Memory monitor this slice of buffered SSTs that
// contain range keys across ExportRequests.
ssts := make([][]byte, 0)
for {
select {
case <-ctxDone:
return nil, ctx.Err()
case req := <-todo:
var rawResp roachpb.Response
var pErr *roachpb.Error
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest fingerprint for span %s", roachpb.Span{Key: startKey, EndKey: endKey}),
5*time.Minute, func(ctx context.Context) error {
rawResp, pErr = kv.SendWrappedWithAdmission(ctx,
evalCtx.Txn.DB().NonTransactionalSender(), header, admissionHeader, req)
if pErr != nil {
return pErr.GoError()
}
return nil
})
if exportRequestErr != nil {
return nil, exportRequestErr
}

resp := rawResp.(*roachpb.ExportResponse)
for _, file := range resp.Files {
fingerprint = fingerprint ^ file.Fingerprint

// Aggregate all the range keys that need fingerprinting once all
// ExportRequests have been completed.
if len(file.SST) != 0 {
ssts = append(ssts, file.SST)
}
}
if resp.ResumeSpan != nil {
if !resp.ResumeSpan.Valid() {
return nil, errors.Errorf("invalid resume span: %s", resp.ResumeSpan)
}

resumeReq := req
resumeReq.RequestHeader = roachpb.RequestHeaderFromSpan(*resp.ResumeSpan)
todo <- resumeReq
}
default:
// No ExportRequests left to send. We've aggregated range keys
// across all ExportRequests and can now fingerprint them.
//
// NB: We aggregate rangekeys across ExportRequests and then
// fingerprint them on the client, instead of fingerprinting them as
// part of the ExportRequest command evaluation, because range keys
// do not have a stable, discrete identity. Their fragmentation can
// be influenced by rangekeys outside the time interval that we are
// fingerprinting, or by range splits. So, we need to "defragment"
// all the rangekey stacks we observe such that the fragmentation is
// deterministic on only the data we want to fingerprint in our key
// and time interval.
//
// Egs:
//
// t2 [-----)[----)
//
// t1 [----)[-----)
// a b c d
//
// Assume we have two rangekeys [a, c)@t1 and [b, d)@t2. They will
// fragment as shown in the diagram above. If we wish to fingerprint
// key [a-d) in time interval (t1, t2] the fragmented rangekey
// [a, c)@t1 is outside our time interval and should not influence our
// fingerprint. The iterator in `fingerprintRangekeys` will
// "defragment" the rangekey stacks [b-c)@t2 and [c-d)@t2 and
// fingerprint them as a single rangekey with bounds [b-d)@t2.
rangekeyFingerprint, err := storage.FingerprintRangekeys(ctx, evalCtx.Settings,
storage.MVCCExportFingerprintOptions{
StripTenantPrefix: true,
StripValueChecksum: true,
}, ssts)
if err != nil {
return nil, err
}
fingerprint = fingerprint ^ rangekeyFingerprint
return tree.NewDInt(tree.DInt(fingerprint)), nil
}
}
},
Info: "This function is used only by CockroachDB's developers for testing purposes.",
Volatility: volatility.Immutable,
},
),
}

var lengthImpls = func(incBitOverload bool) builtinDefinition {
Expand Down
Loading

0 comments on commit b5be006

Please sign in to comment.