Skip to content

Commit

Permalink
backupccl: create stripped crdb_internal.fingerprint overload
Browse files Browse the repository at this point in the history
This patch adds a variant of crdb_internal.fingerprint(), which creates a
"stripped" fingerprint of the target span. Namely,
`crdb_internal.fingerping(span,true)` will return a fingerprint that is
agnostic to the mvcc timestamps and the index prefix of the key, and considers
only the latest mvcc history of the key span.

For example, suppose the user fingerprinted a table at some system time, then
backed up and restored it to that same system time. The restored table should
have the same fingerprint!

This fingerprint variant is signicantly more scalable than SHOW EXPERIMENTAL
FINGERPRINT, as it uses export requests compared to a simple scan.

Fixes cockroachdb#98570

Release note: None
  • Loading branch information
msbutler committed Mar 21, 2023
1 parent ca5ae38 commit dedb24d
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 162 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3111,6 +3111,8 @@ may increase either contention or retry errors, or both.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.fingerprint"></a><code>crdb_internal.fingerprint(span: <a href="bytes.html">bytes</a>[], start_time: <a href="timestamp.html">timestamptz</a>, all_revisions: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.fingerprint"></a><code>crdb_internal.fingerprint(span: <a href="bytes.html">bytes</a>[], stripped: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.force_assertion_error"></a><code>crdb_internal.force_assertion_error(msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.force_error"></a><code>crdb_internal.force_error(errorCode: <a href="string.html">string</a>, msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
Expand Down
41 changes: 40 additions & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,8 @@ func backupAndRestore(

sqlDB.Exec(t, incBackupQuery, queryArgs...)
}
bankTableID := sqlutils.QueryTableID(t, conn, "data", "public", "bank")
backupTableFingerprint := sqlutils.FingerprintTable(t, sqlDB, bankTableID)

sqlDB.Exec(t, `DROP DATABASE data CASCADE`)

Expand All @@ -1021,7 +1023,7 @@ func backupAndRestore(
restoreQuery = fmt.Sprintf("%s WITH kms = %s", restoreQuery, kmsURIFmtString)
}
queryArgs := append(restoreURIArgs, kmsURIArgs...)
verifyRestoreData(t, sqlDB, storageSQLDB, restoreQuery, queryArgs, numAccounts)
verifyRestoreData(t, sqlDB, storageSQLDB, restoreQuery, queryArgs, numAccounts, backupTableFingerprint)
}

func verifyRestoreData(
Expand All @@ -1031,6 +1033,7 @@ func verifyRestoreData(
restoreQuery string,
restoreURIArgs []interface{},
numAccounts int,
bankStrippedFingerprint int,
) {
var unused string
var restored struct {
Expand Down Expand Up @@ -1081,6 +1084,42 @@ func verifyRestoreData(
t.Fatal("unexpected span start at primary index")
}
}
restorebankID := sqlutils.QueryTableID(t, sqlDB.DB, "data", "public", "bank")
require.Equal(t, bankStrippedFingerprint, sqlutils.FingerprintTable(t, sqlDB, restorebankID))
}

func TestBackupRestoreFingerprintStripped(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(sqlDB)
db.Exec(t, "CREATE DATABASE IF NOT EXISTS test")
db.Exec(t, "CREATE TABLE test.test (k PRIMARY KEY) AS SELECT generate_series(1, 10)")
db.Exec(t, `BACKUP DATABASE test INTO "userfile:///foo"`)
backupTableID := sqlutils.QueryTableID(t, sqlDB, "test", "public", "test")

strippedFingerprint := func(tableID int) int {
skipTSfingerprintQuery := fmt.Sprintf(`
SELECT *
FROM
crdb_internal.fingerprint(
crdb_internal.table_span(%d),
true
)`, tableID)

var fingerprint int
require.NoError(t, sqlDB.QueryRow(skipTSfingerprintQuery).Scan(&fingerprint))
return fingerprint
}
fp1 := strippedFingerprint(int(backupTableID))

db.Exec(t, `RESTORE DATABASE test FROM LATEST IN "userfile:///foo" WITH new_db_name = "test2"`)
restoreTableID := sqlutils.QueryTableID(t, sqlDB, "test2", "public", "test")
fp2 := strippedFingerprint(int(restoreTableID))
require.Equal(t, fp1, fp2)
}

func TestBackupRestoreSystemTables(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backuprand/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/backupccl/backuprand/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestBackupRestoreRandomDataRoundtrips conducts backup/restore roundtrips on
Expand Down Expand Up @@ -97,12 +98,13 @@ database_name = 'rand' AND schema_name = 'public'`)
}

expectedCreateTableStmt := make(map[string]string)
expectedData := make(map[string][][]string)
expectedData := make(map[string]int)
for _, tableName := range tableNames {
expectedCreateTableStmt[tableName] = sqlDB.QueryStr(t,
fmt.Sprintf(`SELECT create_statement FROM [SHOW CREATE TABLE %s]`, tree.NameString(tableName)))[0][0]
if runSchemaOnlyExtension == "" {
expectedData[tableName] = sqlDB.QueryStr(t, fmt.Sprintf(`SELECT * FROM %s`, tree.NameString(tableName)))
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "rand", "public", tableName)
expectedData[tableName] = sqlutils.FingerprintTable(t, sqlDB, tableID)
}
}

Expand Down Expand Up @@ -135,7 +137,8 @@ database_name = 'rand' AND schema_name = 'public'`)
assert.Equal(t, expectedCreateTableStmt[tableName], createStmt,
"SHOW CREATE %s not equal after RESTORE", tableName)
if runSchemaOnlyExtension == "" {
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT * FROM %s`, restoreTable), expectedData[tableName])
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "restoredb", "public", tableName)
require.Equal(t, expectedData[tableName], sqlutils.FingerprintTable(t, sqlDB, tableID))
} else {
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT count(*) FROM %s`, restoreTable),
[][]string{{"0"}})
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1688,7 +1688,13 @@ message ExportRequest {

reserved 2, 5, 7, 8, 11;

// Next ID: 15
FingerprintOptions fingerprint_options = 15 [(gogoproto.nullable) = false];

// Next ID: 16
}

message FingerprintOptions {
bool stripped = 1;
}

// BulkOpSummary summarizes the data processed by an operation, counting the
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,21 @@ func evalExport(
opts.FingerprintOptions = storage.MVCCExportFingerprintOptions{
StripTenantPrefix: true,
StripValueChecksum: true,
StrippedVersion: args.FingerprintOptions.Stripped,
}
if opts.FingerprintOptions.StrippedVersion && args.MVCCFilter == kvpb.MVCCFilter_All {
// If a key's value were updated from a to b, the xor hash without
// timestamps of those two mvcc values would look the same if the key
// were updated from b to a. In other words, the order of key value
// updates without timestamps does not affect the xor hash; but this
// order clearly presents different mvcc history, therefore, do not
// strip timestamps if fingerprinting all mvcc history.
return result.Result{}, errors.New("cannot fingerprint without mvcc timestamps and with mvcc history")
}
if opts.FingerprintOptions.StrippedVersion && !args.StartTime.IsEmpty() {
// Supplying a startKey only complicates results (e.g. it surfaces
// tombstones), given that only the latest keys are surfaced.
return result.Result{}, errors.New("cannot fingerprint without mvcc timestamps and with a start time")
}
var hasRangeKeys bool
summary, resumeInfo, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(ctx,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"all_builtins.go",
"builtins.go",
"compression.go",
"fingerprint_builtins.go",
"fixed_oids.go",
"generator_builtins.go",
"generator_probe_ranges.go",
Expand Down
164 changes: 19 additions & 145 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/password"
Expand Down Expand Up @@ -77,10 +76,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
Expand Down Expand Up @@ -7491,6 +7487,24 @@ expires until the statement bundle is collected`,
tree.FunctionProperties{
Category: builtinconstants.CategorySystemInfo,
},
tree.Overload{
Types: tree.ParamTypes{
{Name: "span", Typ: types.BytesArray},
{Name: "stripped", Typ: types.Bool},
// NB: The function can be called with an AOST clause that will be used
// as the `end_time` when issuing the ExportRequests for the purposes of
// fingerprinting.
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
if skipTimestamp := *args[1].(*tree.DBool); !skipTimestamp {
return nil, errors.New("cannot use crdb_internal.fingerprint skip timestamp overload with skip_ts=false")
}
return fingerprint(ctx, evalCtx, args, hlc.Timestamp{}, false, true)
},
Info: "This function is used only by CockroachDB's developers for testing purposes.",
Volatility: volatility.Stable,
},
tree.Overload{
Types: tree.ParamTypes{
{Name: "span", Typ: types.BytesArray},
Expand All @@ -7502,150 +7516,10 @@ expires until the statement bundle is collected`,
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
ctx, sp := tracing.ChildSpan(ctx, "crdb_internal.fingerprint")
defer sp.Finish()

if !evalCtx.Settings.Version.IsActive(ctx, clusterversion.V23_1) {
return nil, errors.Errorf("cannot use crdb_internal.fingerprint until the cluster version is at least %s",
clusterversion.V23_1.String())
}

isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, errors.New("crdb_internal.fingerprint() requires admin privilege")
}

arr := tree.MustBeDArray(args[0])
if arr.Len() != 2 {
return nil, errors.New("expected an array of two elements")
}
startKey := []byte(tree.MustBeDBytes(arr.Array[0]))
endKey := []byte(tree.MustBeDBytes(arr.Array[1]))
startTime := args[1].(*tree.DTimestampTZ).Time
startTimestamp := hlc.Timestamp{WallTime: startTime.UnixNano()}
allRevisions := *args[2].(*tree.DBool)
filter := kvpb.MVCCFilter_Latest
if allRevisions {
filter = kvpb.MVCCFilter_All
}

req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{Key: startKey, EndKey: endKey},
StartTime: startTimestamp,
MVCCFilter: filter,
ExportFingerprint: true,
}
header := kvpb.Header{
Timestamp: evalCtx.Txn.ReadTimestamp(),
// We set WaitPolicy to Error, so that the export will return an error
// to us instead of a blocking wait if it hits any other txns.
//
// TODO(adityamaru): We might need to handle WriteIntentErrors
// specially in the future so as to allow the fingerprint to complete
// in the face of intents.
WaitPolicy: lock.WaitPolicy_Error,
// TODO(ssd): Setting this disables async sending in
// DistSender so it likely substantially impacts
// performance.
ReturnElasticCPUResumeSpans: true,
}
admissionHeader := kvpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: kvpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}

todo := make(chan *kvpb.ExportRequest, 1)
todo <- req
ctxDone := ctx.Done()
var fingerprint uint64
// TODO(adityamaru): Memory monitor this slice of buffered SSTs that
// contain range keys across ExportRequests.
ssts := make([][]byte, 0)
for {
select {
case <-ctxDone:
return nil, ctx.Err()
case req := <-todo:
var rawResp kvpb.Response
var pErr *kvpb.Error
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest fingerprint for span %s", roachpb.Span{Key: startKey, EndKey: endKey}),
5*time.Minute, func(ctx context.Context) error {
rawResp, pErr = kv.SendWrappedWithAdmission(ctx,
evalCtx.Txn.DB().NonTransactionalSender(), header, admissionHeader, req)
if pErr != nil {
return pErr.GoError()
}
return nil
})
if exportRequestErr != nil {
return nil, exportRequestErr
}

resp := rawResp.(*kvpb.ExportResponse)
for _, file := range resp.Files {
fingerprint = fingerprint ^ file.Fingerprint

// Aggregate all the range keys that need fingerprinting once all
// ExportRequests have been completed.
if len(file.SST) != 0 {
ssts = append(ssts, file.SST)
}
}
if resp.ResumeSpan != nil {
if !resp.ResumeSpan.Valid() {
return nil, errors.Errorf("invalid resume span: %s", resp.ResumeSpan)
}

resumeReq := req
resumeReq.RequestHeader = kvpb.RequestHeaderFromSpan(*resp.ResumeSpan)
todo <- resumeReq
}
default:
// No ExportRequests left to send. We've aggregated range keys
// across all ExportRequests and can now fingerprint them.
//
// NB: We aggregate rangekeys across ExportRequests and then
// fingerprint them on the client, instead of fingerprinting them as
// part of the ExportRequest command evaluation, because range keys
// do not have a stable, discrete identity. Their fragmentation can
// be influenced by rangekeys outside the time interval that we are
// fingerprinting, or by range splits. So, we need to "defragment"
// all the rangekey stacks we observe such that the fragmentation is
// deterministic on only the data we want to fingerprint in our key
// and time interval.
//
// Egs:
//
// t2 [-----)[----)
//
// t1 [----)[-----)
// a b c d
//
// Assume we have two rangekeys [a, c)@t1 and [b, d)@t2. They will
// fragment as shown in the diagram above. If we wish to fingerprint
// key [a-d) in time interval (t1, t2] the fragmented rangekey
// [a, c)@t1 is outside our time interval and should not influence our
// fingerprint. The iterator in `fingerprintRangekeys` will
// "defragment" the rangekey stacks [b-c)@t2 and [c-d)@t2 and
// fingerprint them as a single rangekey with bounds [b-d)@t2.
rangekeyFingerprint, err := storage.FingerprintRangekeys(ctx, evalCtx.Settings,
storage.MVCCExportFingerprintOptions{
StripTenantPrefix: true,
StripValueChecksum: true,
}, ssts)
if err != nil {
return nil, err
}
fingerprint = fingerprint ^ rangekeyFingerprint
return tree.NewDInt(tree.DInt(fingerprint)), nil
}
}
return fingerprint(ctx, evalCtx, args, startTimestamp, bool(allRevisions), false)
},
Info: "This function is used only by CockroachDB's developers for testing purposes.",
Volatility: volatility.Stable,
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/sem/builtins/fingerprint_builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,24 @@ FROM
db.QueryRow(t, fingerprintQuery).Scan(&fingerprint4)
require.NotEqual(t, fingerprint1, fingerprint4)
}

func TestFingerprintStripped(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(sqlDB)
db.Exec(t, "CREATE DATABASE IF NOT EXISTS test")
db.Exec(t, "CREATE TABLE test.test (k PRIMARY KEY) AS SELECT generate_series(1, 10)")

// Create the same sql rows in a different table, committed at a different timestamp.
db.Exec(t, "CREATE TABLE test.test2 (k PRIMARY KEY) AS SELECT generate_series(1, 10)")

strippedFingerprint := func(tableName string) int {
tableID := sqlutils.QueryTableID(t, sqlDB, "test", "public", tableName)
return sqlutils.FingerprintTable(t, db, tableID)
}
require.Equal(t, strippedFingerprint("test"), strippedFingerprint("test2"))
}
Loading

0 comments on commit dedb24d

Please sign in to comment.