Skip to content

Commit

Permalink
backupccl: create stripped crdb_internal.fingerprint overload
Browse files Browse the repository at this point in the history
This patch adds a variant of crdb_internal.fingerprint(), which creates a
"stripped" fingerprint of the target span. Namely,
`crdb_internal.fingerpint(span,true)` will return a fingerprint that is
agnostic to the mvcc timestamps and the index prefix of the key, and considers
only the latest mvcc history of the key span. This cmd should only get used
over user table space, i.e. on keys with an index prefix.

For example, suppose the user fingerprinted a table at some system time, then
backed up and restored it to that same system time. The restored table should
have the same fingerprint!

crdb_internal.fingerpint(span,false) is equivalent to
crdb_internal.fingerpint(span,NULL,LATEST).

This fingerprint variant is signicantly more scalable than SHOW EXPERIMENTAL
FINGERPRINT, as it uses export requests compared to a simple scan.

Fixes #98570

Release note: None
  • Loading branch information
msbutler committed Mar 24, 2023
1 parent b7d6f75 commit cf91202
Show file tree
Hide file tree
Showing 17 changed files with 427 additions and 156 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3127,6 +3127,8 @@ may increase either contention or retry errors, or both.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.fingerprint"></a><code>crdb_internal.fingerprint(span: <a href="bytes.html">bytes</a>[], start_time: <a href="timestamp.html">timestamptz</a>, all_revisions: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.fingerprint"></a><code>crdb_internal.fingerprint(span: <a href="bytes.html">bytes</a>[], stripped: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.force_assertion_error"></a><code>crdb_internal.force_assertion_error(msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.force_error"></a><code>crdb_internal.force_error(errorCode: <a href="string.html">string</a>, msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,8 @@ SELECT payload FROM "".crdb_internal.system_jobs ORDER BY created DESC LIMIT 10

sqlDB.Exec(t, incBackupQuery, queryArgs...)
}
bankTableID := sqlutils.QueryTableID(t, conn, "data", "public", "bank")
backupTableFingerprint := sqlutils.FingerprintTable(t, sqlDB, bankTableID)

sqlDB.Exec(t, `DROP DATABASE data CASCADE`)

Expand All @@ -1100,7 +1102,7 @@ SELECT payload FROM "".crdb_internal.system_jobs ORDER BY created DESC LIMIT 10
restoreQuery = fmt.Sprintf("%s WITH kms = %s", restoreQuery, kmsURIFmtString)
}
queryArgs := append(restoreURIArgs, kmsURIArgs...)
verifyRestoreData(t, sqlDB, storageSQLDB, restoreQuery, queryArgs, numAccounts)
verifyRestoreData(t, sqlDB, storageSQLDB, restoreQuery, queryArgs, numAccounts, backupTableFingerprint)
}

func verifyRestoreData(
Expand All @@ -1110,6 +1112,7 @@ func verifyRestoreData(
restoreQuery string,
restoreURIArgs []interface{},
numAccounts int,
bankStrippedFingerprint int,
) {
var unused string
var restored struct {
Expand Down Expand Up @@ -1160,6 +1163,8 @@ func verifyRestoreData(
t.Fatal("unexpected span start at primary index")
}
}
restorebankID := sqlutils.QueryTableID(t, sqlDB.DB, "data", "public", "bank")
require.Equal(t, bankStrippedFingerprint, sqlutils.FingerprintTable(t, sqlDB, restorebankID))
}

func TestBackupRestoreSystemTables(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backuprand/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/backupccl/backuprand/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestBackupRestoreRandomDataRoundtrips conducts backup/restore roundtrips on
Expand Down Expand Up @@ -97,12 +98,13 @@ database_name = 'rand' AND schema_name = 'public'`)
}

expectedCreateTableStmt := make(map[string]string)
expectedData := make(map[string][][]string)
expectedData := make(map[string]int)
for _, tableName := range tableNames {
expectedCreateTableStmt[tableName] = sqlDB.QueryStr(t,
fmt.Sprintf(`SELECT create_statement FROM [SHOW CREATE TABLE %s]`, tree.NameString(tableName)))[0][0]
if runSchemaOnlyExtension == "" {
expectedData[tableName] = sqlDB.QueryStr(t, fmt.Sprintf(`SELECT * FROM %s`, tree.NameString(tableName)))
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "rand", "public", tableName)
expectedData[tableName] = sqlutils.FingerprintTable(t, sqlDB, tableID)
}
}

Expand Down Expand Up @@ -135,7 +137,8 @@ database_name = 'rand' AND schema_name = 'public'`)
assert.Equal(t, expectedCreateTableStmt[tableName], createStmt,
"SHOW CREATE %s not equal after RESTORE", tableName)
if runSchemaOnlyExtension == "" {
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT * FROM %s`, restoreTable), expectedData[tableName])
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "restoredb", "public", tableName)
require.Equal(t, expectedData[tableName], sqlutils.FingerprintTable(t, sqlDB, tableID))
} else {
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT count(*) FROM %s`, restoreTable),
[][]string{{"0"}})
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,13 @@ message ExportRequest {

reserved 2, 5, 7, 8, 11;

// Next ID: 15
FingerprintOptions fingerprint_options = 15 [(gogoproto.nullable) = false];

// Next ID: 16
}

message FingerprintOptions {
bool strip_index_prefix_and_timestamp = 1;
}

// BulkOpSummary summarizes the data processed by an operation, counting the
Expand Down
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,23 @@ func evalExport(
// values before fingerprinting so that the fingerprint is tenant
// agnostic.
opts.FingerprintOptions = storage.MVCCExportFingerprintOptions{
StripTenantPrefix: true,
StripValueChecksum: true,
StripTenantPrefix: true,
StripValueChecksum: true,
StripIndexPrefixAndTimestamp: args.FingerprintOptions.StripIndexPrefixAndTimestamp,
}
if opts.FingerprintOptions.StripIndexPrefixAndTimestamp && args.MVCCFilter == kvpb.MVCCFilter_All {
// If a key's value were updated from a to b, the xor hash without
// timestamps of those two mvcc values would look the same if the key
// were updated from b to a. In other words, the order of key value
// updates without timestamps does not affect the xor hash; but this
// order clearly presents different mvcc history, therefore, do not
// strip timestamps if fingerprinting all mvcc history.
return result.Result{}, errors.New("cannot fingerprint without mvcc timestamps and with mvcc history")
}
if opts.FingerprintOptions.StripIndexPrefixAndTimestamp && !args.StartTime.IsEmpty() {
// Supplying a startKey only complicates results (e.g. it surfaces
// tombstones), given that only the latest keys are surfaced.
return result.Result{}, errors.New("cannot fingerprint without mvcc timestamps and with a start time")
}
var hasRangeKeys bool
summary, resumeInfo, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(ctx,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"all_builtins.go",
"builtins.go",
"compression.go",
"fingerprint_builtins.go",
"fixed_oids.go",
"generator_builtins.go",
"generator_probe_ranges.go",
Expand Down
191 changes: 50 additions & 141 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/password"
Expand Down Expand Up @@ -77,10 +76,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
Expand Down Expand Up @@ -7524,158 +7520,61 @@ expires until the statement bundle is collected`,
Category: builtinconstants.CategorySystemInfo,
},
tree.Overload{
// If the second arg is set to true, this overload allows the caller to
// execute a "stripped" fingerprint on the latest keys in a span. This
// stripped fingerprint strips each key's timestamp and index prefix
// before hashing, enabling a user to assert that two different tables
// have the same latest keys, for example. Because the index prefix is
// stripped, this option should only get used in the table key space.
//
// If the stripped param is set to false, this overload is equivalent to
// 'crdb_internal.fingerprint(span,NULL,LATEST)'

Types: tree.ParamTypes{
{Name: "span", Typ: types.BytesArray},
{Name: "start_time", Typ: types.TimestampTZ},
{Name: "all_revisions", Typ: types.Bool},
{Name: "stripped", Typ: types.Bool},
// NB: The function can be called with an AOST clause that will be used
// as the `end_time` when issuing the ExportRequests for the purposes of
// fingerprinting.
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
ctx, sp := tracing.ChildSpan(ctx, "crdb_internal.fingerprint")
defer sp.Finish()

if !evalCtx.Settings.Version.IsActive(ctx, clusterversion.V23_1) {
return nil, errors.Errorf("cannot use crdb_internal.fingerprint until the cluster version is at least %s",
clusterversion.V23_1.String())
if len(args) != 2 {
return nil, errors.New("argument list must have two elements")
}

isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
span, err := parseSpan(args[0])
if err != nil {
return nil, err
}
if !isAdmin {
return nil, errors.New("crdb_internal.fingerprint() requires admin privilege")
skipTimestamp := bool(tree.MustBeDBool(args[1]))
return fingerprint(ctx, evalCtx, span, hlc.Timestamp{} /* allRevisions */, false,
skipTimestamp)
},
Info: "This function is used only by CockroachDB's developers for testing purposes.",
Volatility: volatility.Stable,
},
tree.Overload{
Types: tree.ParamTypes{
{Name: "span", Typ: types.BytesArray},
{Name: "start_time", Typ: types.TimestampTZ},
{Name: "all_revisions", Typ: types.Bool},
// NB: The function can be called with an AOST clause that will be used
// as the `end_time` when issuing the ExportRequests for the purposes of
// fingerprinting.
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
if len(args) != 3 {
return nil, errors.New("argument list must have three elements")
}

arr := tree.MustBeDArray(args[0])
if arr.Len() != 2 {
return nil, errors.New("expected an array of two elements")
span, err := parseSpan(args[0])
if err != nil {
return nil, err
}
startKey := []byte(tree.MustBeDBytes(arr.Array[0]))
endKey := []byte(tree.MustBeDBytes(arr.Array[1]))
startTime := args[1].(*tree.DTimestampTZ).Time
startTime := tree.MustBeDTimestampTZ(args[1]).Time
startTimestamp := hlc.Timestamp{WallTime: startTime.UnixNano()}
allRevisions := *args[2].(*tree.DBool)
filter := kvpb.MVCCFilter_Latest
if allRevisions {
filter = kvpb.MVCCFilter_All
}

header := kvpb.Header{
Timestamp: evalCtx.Txn.ReadTimestamp(),
// We set WaitPolicy to Error, so that the export will return an error
// to us instead of a blocking wait if it hits any other txns.
//
// TODO(adityamaru): We might need to handle WriteIntentErrors
// specially in the future so as to allow the fingerprint to complete
// in the face of intents.
WaitPolicy: lock.WaitPolicy_Error,
// TODO(ssd): Setting this disables async sending in
// DistSender so it likely substantially impacts
// performance.
ReturnElasticCPUResumeSpans: true,
}
admissionHeader := kvpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: kvpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}

todo := make(chan kvpb.RequestHeader, 1)
todo <- kvpb.RequestHeader{Key: startKey, EndKey: endKey}

ctxDone := ctx.Done()
var fingerprint uint64
// TODO(adityamaru): Memory monitor this slice of buffered SSTs that
// contain range keys across ExportRequests.
ssts := make([][]byte, 0)
for {
select {
case <-ctxDone:
return nil, ctx.Err()
case reqHeader := <-todo:
req := &kvpb.ExportRequest{
RequestHeader: reqHeader,
StartTime: startTimestamp,
MVCCFilter: filter,
ExportFingerprint: true,
}
var rawResp kvpb.Response
var pErr *kvpb.Error
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest fingerprint for span %s", roachpb.Span{Key: startKey, EndKey: endKey}),
5*time.Minute, func(ctx context.Context) error {
rawResp, pErr = kv.SendWrappedWithAdmission(ctx,
evalCtx.Txn.DB().NonTransactionalSender(), header, admissionHeader, req)
if pErr != nil {
return pErr.GoError()
}
return nil
})
if exportRequestErr != nil {
return nil, exportRequestErr
}

resp := rawResp.(*kvpb.ExportResponse)
for _, file := range resp.Files {
fingerprint = fingerprint ^ file.Fingerprint

// Aggregate all the range keys that need fingerprinting once all
// ExportRequests have been completed.
if len(file.SST) != 0 {
ssts = append(ssts, file.SST)
}
}
if resp.ResumeSpan != nil {
if !resp.ResumeSpan.Valid() {
return nil, errors.Errorf("invalid resume span: %s", resp.ResumeSpan)
}
todo <- kvpb.RequestHeaderFromSpan(*resp.ResumeSpan)
}
default:
// No ExportRequests left to send. We've aggregated range keys
// across all ExportRequests and can now fingerprint them.
//
// NB: We aggregate rangekeys across ExportRequests and then
// fingerprint them on the client, instead of fingerprinting them as
// part of the ExportRequest command evaluation, because range keys
// do not have a stable, discrete identity. Their fragmentation can
// be influenced by rangekeys outside the time interval that we are
// fingerprinting, or by range splits. So, we need to "defragment"
// all the rangekey stacks we observe such that the fragmentation is
// deterministic on only the data we want to fingerprint in our key
// and time interval.
//
// Egs:
//
// t2 [-----)[----)
//
// t1 [----)[-----)
// a b c d
//
// Assume we have two rangekeys [a, c)@t1 and [b, d)@t2. They will
// fragment as shown in the diagram above. If we wish to fingerprint
// key [a-d) in time interval (t1, t2] the fragmented rangekey
// [a, c)@t1 is outside our time interval and should not influence our
// fingerprint. The iterator in `fingerprintRangekeys` will
// "defragment" the rangekey stacks [b-c)@t2 and [c-d)@t2 and
// fingerprint them as a single rangekey with bounds [b-d)@t2.
rangekeyFingerprint, err := storage.FingerprintRangekeys(ctx, evalCtx.Settings,
storage.MVCCExportFingerprintOptions{
StripTenantPrefix: true,
StripValueChecksum: true,
}, ssts)
if err != nil {
return nil, err
}
fingerprint = fingerprint ^ rangekeyFingerprint
return tree.NewDInt(tree.DInt(fingerprint)), nil
}
}
allRevisions := bool(tree.MustBeDBool(args[2]))
return fingerprint(ctx, evalCtx, span, startTimestamp, allRevisions /* stripped */, false)
},
Info: "This function is used only by CockroachDB's developers for testing purposes.",
Volatility: volatility.Stable,
Expand Down Expand Up @@ -10595,3 +10494,13 @@ func prettyStatement(p tree.PrettyCfg, stmt string) (string, error) {
}
return formattedStmt.String(), nil
}

func parseSpan(arg tree.Datum) (roachpb.Span, error) {
arr := tree.MustBeDArray(arg)
if arr.Len() != 2 {
return roachpb.Span{}, errors.New("expected an array of two elements")
}
startKey := []byte(tree.MustBeDBytes(arr.Array[0]))
endKey := []byte(tree.MustBeDBytes(arr.Array[1]))
return roachpb.Span{Key: startKey, EndKey: endKey}, nil
}
21 changes: 21 additions & 0 deletions pkg/sql/sem/builtins/fingerprint_builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,24 @@ FROM
db.QueryRow(t, fingerprintQuery).Scan(&fingerprint4)
require.NotEqual(t, fingerprint1, fingerprint4)
}

func TestFingerprintStripped(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(sqlDB)
db.Exec(t, "CREATE DATABASE IF NOT EXISTS test")
db.Exec(t, "CREATE TABLE test.test (k PRIMARY KEY) AS SELECT generate_series(1, 10)")

// Create the same sql rows in a different table, committed at a different timestamp.
db.Exec(t, "CREATE TABLE test.test2 (k PRIMARY KEY) AS SELECT generate_series(1, 10)")

strippedFingerprint := func(tableName string) int {
tableID := sqlutils.QueryTableID(t, sqlDB, "test", "public", tableName)
return sqlutils.FingerprintTable(t, db, tableID)
}
require.Equal(t, strippedFingerprint("test"), strippedFingerprint("test2"))
}
Loading

0 comments on commit cf91202

Please sign in to comment.