diff --git a/.bazelversion b/.bazelversion index 7a22e8990c5f..a541fd8b0f51 100644 --- a/.bazelversion +++ b/.bazelversion @@ -1 +1 @@ -cockroachdb/5.0.0 +cockroachdb/5.1.0 diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index d3110b7941a2..6c62ea52862d 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1524,36 +1524,10 @@ func getBackupDetailAndManifest( user security.SQLUsername, ) (jobspb.BackupDetails, BackupManifest, error) { makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI - - mvccFilter := MVCCFilter_Latest - if initialDetails.RevisionHistory { - mvccFilter = MVCCFilter_All - } - endTime := initialDetails.EndTime - var targetDescs []catalog.Descriptor - var descriptorProtos []descpb.Descriptor - if initialDetails.FullCluster { - var err error - targetDescs, _, err = fullClusterTargetsBackup(ctx, execCfg, endTime) - if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err - } - descriptorProtos = make([]descpb.Descriptor, len(targetDescs)) - for i, desc := range targetDescs { - descriptorProtos[i] = *desc.DescriptorProto() - } - } else { - descriptorProtos = initialDetails.ResolvedTargets - targetDescs = make([]catalog.Descriptor, len(descriptorProtos)) - for i := range descriptorProtos { - targetDescs[i] = descbuilder.NewBuilder(&descriptorProtos[i]).BuildExistingMutable() - } - } - // TODO(pbardea): Refactor (defaultURI and urisByLocalityKV) pairs into a // backupDestination struct. collectionURI, defaultURI, resolvedSubdir, urisByLocalityKV, prevs, err := - resolveDest(ctx, user, initialDetails.Destination, endTime, initialDetails.IncrementalFrom, execCfg) + resolveDest(ctx, user, initialDetails.Destination, initialDetails.EndTime, initialDetails.IncrementalFrom, execCfg) if err != nil { return jobspb.BackupDetails{}, BackupManifest{}, err } @@ -1575,6 +1549,7 @@ func getBackupDetailAndManifest( prevBackups, encryptionOptions, memSize, err := fetchPreviousBackups(ctx, &mem, user, makeCloudStorage, prevs, *initialDetails.EncryptionOptions, kmsEnv) + if err != nil { return jobspb.BackupDetails{}, BackupManifest{}, err } @@ -1588,91 +1563,163 @@ func getBackupDetailAndManifest( !initialDetails.FullCluster { return jobspb.BackupDetails{}, BackupManifest{}, errors.Errorf("cannot append a backup of specific tables or databases to a cluster backup") } - } - var startTime hlc.Timestamp - if len(prevBackups) > 0 { if err := requireEnterprise(execCfg, "incremental"); err != nil { return jobspb.BackupDetails{}, BackupManifest{}, err } - startTime = prevBackups[len(prevBackups)-1].EndTime - } - - var tables []catalog.TableDescriptor - statsFiles := make(map[descpb.ID]string) - for _, desc := range targetDescs { - switch desc := desc.(type) { - case catalog.TableDescriptor: - tables = append(tables, desc) - // TODO (anzo): look into the tradeoffs of having all objects in the array to be in the same file, - // vs having each object in a separate file, or somewhere in between. - statsFiles[desc.GetID()] = backupStatisticsFileName - } } - clusterID := execCfg.ClusterID() for i := range prevBackups { // IDs are how we identify tables, and those are only meaningful in the // context of their own cluster, so we need to ensure we only allow // incremental previous backups that we created. - if fromCluster := prevBackups[i].ClusterID; !fromCluster.Equal(clusterID) { + if fromCluster := prevBackups[i].ClusterID; !fromCluster.Equal(execCfg.ClusterID()) { return jobspb.BackupDetails{}, BackupManifest{}, errors.Newf("previous BACKUP belongs to cluster %s", fromCluster.String()) } } - var newSpans roachpb.Spans - - var priorIDs map[descpb.ID]descpb.ID + // updatedDetails and backupManifest should be treated as read-only after + // they're returned from their respective functions. Future changes to those + // objects should be made within those functions. + updatedDetails, err := updateBackupDetails( + ctx, + initialDetails, + collectionURI, + defaultURI, + resolvedSubdir, + urisByLocalityKV, + prevBackups, + encryptionOptions, + kmsEnv) + if err != nil { + return jobspb.BackupDetails{}, BackupManifest{}, err + } - var revs []BackupManifest_DescriptorRevision - if mvccFilter == MVCCFilter_All { - priorIDs = make(map[descpb.ID]descpb.ID) - revs, err = getRelevantDescChanges(ctx, execCfg, startTime, endTime, targetDescs, - initialDetails.ResolvedCompleteDbs, priorIDs, initialDetails.FullCluster) - if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err - } + backupManifest, err := createBackupManifest( + ctx, + execCfg, + txn, + updatedDetails, + prevBackups) + if err != nil { + return jobspb.BackupDetails{}, BackupManifest{}, err } + return updatedDetails, backupManifest, nil +} + +func getTenantInfo( + ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, jobDetails jobspb.BackupDetails, +) ([]roachpb.Span, []descpb.TenantInfoWithUsage, error) { var spans []roachpb.Span var tenants []descpb.TenantInfoWithUsage - - if initialDetails.FullCluster && execCfg.Codec.ForSystemTenant() { + var err error + if jobDetails.FullCluster && execCfg.Codec.ForSystemTenant() { // Include all tenants. tenants, err = retrieveAllTenantsMetadata( ctx, execCfg.InternalExecutor, txn, ) if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err + return nil, nil, err } - } else if len(initialDetails.SpecificTenantIds) > 0 { - for _, id := range initialDetails.SpecificTenantIds { + } else if len(jobDetails.SpecificTenantIds) > 0 { + for _, id := range jobDetails.SpecificTenantIds { tenantInfo, err := retrieveSingleTenantMetadata( ctx, execCfg.InternalExecutor, txn, id, ) if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err + return nil, nil, err } tenants = append(tenants, tenantInfo) } } + if len(tenants) > 0 && jobDetails.RevisionHistory { + return spans, tenants, errors.UnimplementedError( + errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/47896"}, + "can not backup tenants with revision history", + ) + } + for i := range tenants { + prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(tenants[i].ID)) + spans = append(spans, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}) + } + return spans, tenants, nil +} - if len(tenants) > 0 { - if initialDetails.RevisionHistory { - return jobspb.BackupDetails{}, BackupManifest{}, errors.UnimplementedError( - errors.IssueLink{IssueURL: "https://github.com/cockroachdb/cockroach/issues/47896"}, - "can not backup tenants with revision history", - ) +func createBackupManifest( + ctx context.Context, + execCfg *sql.ExecutorConfig, + txn *kv.Txn, + jobDetails jobspb.BackupDetails, + prevBackups []BackupManifest, +) (BackupManifest, error) { + mvccFilter := MVCCFilter_Latest + if jobDetails.RevisionHistory { + mvccFilter = MVCCFilter_All + } + endTime := jobDetails.EndTime + var targetDescs []catalog.Descriptor + var descriptorProtos []descpb.Descriptor + var err error + if jobDetails.FullCluster { + var err error + targetDescs, _, err = fullClusterTargetsBackup(ctx, execCfg, endTime) + if err != nil { + return BackupManifest{}, err } - for i := range tenants { - prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(tenants[i].ID)) - spans = append(spans, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}) + descriptorProtos = make([]descpb.Descriptor, len(targetDescs)) + for i, desc := range targetDescs { + descriptorProtos[i] = *desc.DescriptorProto() + } + } else { + descriptorProtos = jobDetails.ResolvedTargets + targetDescs = make([]catalog.Descriptor, len(descriptorProtos)) + for i := range descriptorProtos { + targetDescs[i] = descbuilder.NewBuilder(&descriptorProtos[i]).BuildExistingMutable() } } + startTime := jobDetails.StartTime + + var tables []catalog.TableDescriptor + statsFiles := make(map[descpb.ID]string) + for _, desc := range targetDescs { + switch desc := desc.(type) { + case catalog.TableDescriptor: + tables = append(tables, desc) + // TODO (anzo): look into the tradeoffs of having all objects in the array to be in the same file, + // vs having each object in a separate file, or somewhere in between. + statsFiles[desc.GetID()] = backupStatisticsFileName + } + } + + var newSpans roachpb.Spans + var priorIDs map[descpb.ID]descpb.ID + + var revs []BackupManifest_DescriptorRevision + if mvccFilter == MVCCFilter_All { + priorIDs = make(map[descpb.ID]descpb.ID) + revs, err = getRelevantDescChanges(ctx, execCfg, startTime, endTime, targetDescs, + jobDetails.ResolvedCompleteDbs, priorIDs, jobDetails.FullCluster) + if err != nil { + return BackupManifest{}, err + } + } + + var spans []roachpb.Span + var tenants []descpb.TenantInfoWithUsage + tenantSpans, tenantInfos, err := getTenantInfo( + ctx, execCfg, txn, jobDetails, + ) + if err != nil { + return BackupManifest{}, err + } + spans = append(spans, tenantSpans...) + tenants = append(tenants, tenantInfos...) + tableSpans, err := spansForAllTableIndexes(execCfg, tables, revs) if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err + return BackupManifest{}, err } spans = append(spans, tableSpans...) @@ -1689,14 +1736,14 @@ func getBackupDetailAndManifest( dbsInPrev[d] = struct{}{} } - if !initialDetails.FullCluster { + if !jobDetails.FullCluster { if err := checkForNewTables(ctx, execCfg.Codec, execCfg.DB, targetDescs, tablesInPrev, dbsInPrev, priorIDs, startTime, endTime); err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err + return BackupManifest{}, err } // Let's check that we're not widening the scope of this backup to an // entire database, even if no tables were created in the meantime. - if err := checkForNewCompleteDatabases(targetDescs, initialDetails.ResolvedCompleteDbs, dbsInPrev); err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err + if err := checkForNewCompleteDatabases(targetDescs, jobDetails.ResolvedCompleteDbs, dbsInPrev); err != nil { + return BackupManifest{}, err } } @@ -1704,7 +1751,7 @@ func getBackupDetailAndManifest( tableSpans, err := getReintroducedSpans(ctx, execCfg, prevBackups, tables, revs, endTime) if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err + return BackupManifest{}, err } newSpans = append(newSpans, tableSpans...) } @@ -1717,7 +1764,7 @@ func getBackupDetailAndManifest( // of requiring full backups after schema changes remains. coverage := tree.RequestedDescriptors - if initialDetails.FullCluster { + if jobDetails.FullCluster { coverage = tree.AllDescriptors } @@ -1728,7 +1775,7 @@ func getBackupDetailAndManifest( Descriptors: descriptorProtos, Tenants: tenants, DescriptorChanges: revs, - CompleteDbs: initialDetails.ResolvedCompleteDbs, + CompleteDbs: jobDetails.ResolvedCompleteDbs, Spans: spans, IntroducedSpans: newSpans, FormatVersion: BackupFormatDescriptorTrackingVersion, @@ -1738,33 +1785,48 @@ func getBackupDetailAndManifest( StatisticsFilenames: statsFiles, DescriptorCoverage: coverage, } + if err := checkCoverage(ctx, backupManifest.Spans, append(prevBackups, backupManifest)); err != nil { + return BackupManifest{}, errors.Wrap(err, "new backup would not cover expected time") + } + return backupManifest, nil +} - // Verify this backup on its prior chain cover its spans up to its end time, - // as restore would do if it tried to restore this backup. - if err := checkCoverage(ctx, spans, append(prevBackups, backupManifest)); err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, errors.Wrap(err, "new backup would not cover expected time") +func updateBackupDetails( + ctx context.Context, + details jobspb.BackupDetails, + collectionURI string, + defaultURI string, + resolvedSubdir string, + urisByLocalityKV map[string]string, + prevBackups []BackupManifest, + encryptionOptions *jobspb.BackupEncryptionOptions, + kmsEnv *backupKMSEnv, +) (jobspb.BackupDetails, error) { + var err error + var startTime hlc.Timestamp + if len(prevBackups) > 0 { + startTime = prevBackups[len(prevBackups)-1].EndTime } // If we didn't load any prior backups from which get encryption info, we // need to generate encryption specific data. var encryptionInfo *jobspb.EncryptionInfo if encryptionOptions == nil { - encryptionOptions, encryptionInfo, err = makeNewEncryptionOptions(ctx, *initialDetails.EncryptionOptions, kmsEnv) + encryptionOptions, encryptionInfo, err = makeNewEncryptionOptions(ctx, *details.EncryptionOptions, kmsEnv) if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err + return jobspb.BackupDetails{}, err } } - return jobspb.BackupDetails{ - Destination: jobspb.BackupDetails_Destination{Subdir: resolvedSubdir}, - StartTime: startTime, - EndTime: endTime, - URI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, - EncryptionOptions: encryptionOptions, - EncryptionInfo: encryptionInfo, - CollectionURI: collectionURI, - }, backupManifest, nil + details.Destination = jobspb.BackupDetails_Destination{Subdir: resolvedSubdir} + details.StartTime = startTime + details.URI = defaultURI + details.URIsByLocalityKV = urisByLocalityKV + details.EncryptionOptions = encryptionOptions + details.EncryptionInfo = encryptionInfo + details.CollectionURI = collectionURI + + return details, nil } func init() { diff --git a/pkg/storage/engine_key.go b/pkg/storage/engine_key.go index 6fe0fccaae48..fcf3b89dbfc2 100644 --- a/pkg/storage/engine_key.go +++ b/pkg/storage/engine_key.go @@ -42,6 +42,7 @@ type EngineKey struct { // their particular use case, that demultiplex on the various lengths below. // If adding another length to this list, remember to search for code // referencing these lengths and fix it. +// TODO(nvanbenschoten): unify these constants with those in mvcc_key.go. const ( engineKeyNoVersion = 0 engineKeyVersionWallTimeLen = 8 diff --git a/pkg/storage/intent_interleaving_iter.go b/pkg/storage/intent_interleaving_iter.go index f2acc81583c5..07ec2b1d4e6e 100644 --- a/pkg/storage/intent_interleaving_iter.go +++ b/pkg/storage/intent_interleaving_iter.go @@ -737,12 +737,24 @@ func (i *intentInterleavingIter) SeekLT(key MVCCKey) { return } if i.constraint != notConstrained { - i.checkConstraint(key.Key, true) - if i.constraint == constrainedToLocal && bytes.Equal(key.Key, keys.LocalMax) { + // If the seek key of SeekLT is the boundary between the local and global + // keyspaces, iterators constrained in either direction are permitted. + // Iterators constrained to the local keyspace may be scanning from their + // upper bound. Iterators constrained to the global keyspace may have found + // a key on the boundary and may now be scanning before the key, using the + // boundary as an exclusive upper bound. + // NB: an iterator with bounds [L, U) is allowed to SeekLT over any key in + // [L, U]. For local keyspace iterators, U can be LocalMax and for global + // keyspace iterators L can be LocalMax. + localMax := bytes.Equal(key.Key, keys.LocalMax) + if !localMax { + i.checkConstraint(key.Key, true) + } + if localMax && i.constraint == constrainedToLocal { // Move it down to below the lock table so can iterate down cleanly into // the local key space. Note that we disallow anyone using a seek key - // that is a local key above the lock table, and there should no keys in - // the engine there either (at least not keys that we need to see using + // that is a local key above the lock table, and there should be no keys + // in the engine there either (at least not keys that we need to see using // an MVCCIterator). key.Key = keys.LocalRangeLockTablePrefix } diff --git a/pkg/storage/intent_interleaving_iter_test.go b/pkg/storage/intent_interleaving_iter_test.go index 52be98904e4f..cd586706b286 100644 --- a/pkg/storage/intent_interleaving_iter_test.go +++ b/pkg/storage/intent_interleaving_iter_test.go @@ -413,7 +413,7 @@ func TestIntentInterleavingIterBoundaries(t *testing.T) { defer iter.Close() iter.SeekLT(MVCCKey{Key: keys.MaxKey}) }) - // Boundary cases for constrainedToGlobal + // Boundary cases for constrainedToGlobal. func() { opts := IterOptions{LowerBound: keys.LocalMax} iter := newIntentInterleavingIterator(eng, opts).(*intentInterleavingIter) @@ -427,13 +427,13 @@ func TestIntentInterleavingIterBoundaries(t *testing.T) { require.Equal(t, constrainedToGlobal, iter.constraint) iter.SetUpperBound(keys.LocalMax) }) - require.Panics(t, func() { + func() { opts := IterOptions{LowerBound: keys.LocalMax} iter := newIntentInterleavingIterator(eng, opts).(*intentInterleavingIter) defer iter.Close() require.Equal(t, constrainedToGlobal, iter.constraint) iter.SeekLT(MVCCKey{Key: keys.LocalMax}) - }) + }() // Panics for using a local key that is above the lock table. require.Panics(t, func() { opts := IterOptions{UpperBound: keys.LocalMax} diff --git a/pkg/storage/mvcc_key_test.go b/pkg/storage/mvcc_key_test.go index 093ee7e88101..4e8a2ea3d5b0 100644 --- a/pkg/storage/mvcc_key_test.go +++ b/pkg/storage/mvcc_key_test.go @@ -15,9 +15,11 @@ import ( "encoding/hex" "fmt" "math" + "math/rand" "reflect" "sort" "testing" + "testing/quick" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -71,6 +73,7 @@ func TestMVCCKeyCompare(t *testing.T) { b0 := MVCCKey{roachpb.Key("b"), hlc.Timestamp{Logical: 0}} b1 := MVCCKey{roachpb.Key("b"), hlc.Timestamp{Logical: 1}} b2 := MVCCKey{roachpb.Key("b"), hlc.Timestamp{Logical: 2}} + b2S := MVCCKey{roachpb.Key("b"), hlc.Timestamp{Logical: 2, Synthetic: true}} testcases := map[string]struct { a MVCCKey @@ -85,14 +88,76 @@ func TestMVCCKeyCompare(t *testing.T) { "empty time lt set": {b0, b1, -1}, // empty MVCC timestamps sort before non-empty "set time gt empty": {b1, b0, 1}, // empty MVCC timestamps sort before non-empty "key time precedence": {a1, b2, -1}, // a before b, but 2 before 1; key takes precedence + "synthetic equal": {b2, b2S, 0}, // synthetic bit does not affect ordering } for name, tc := range testcases { t.Run(name, func(t *testing.T) { require.Equal(t, tc.expect, tc.a.Compare(tc.b)) + require.Equal(t, tc.expect == 0, tc.a.Equal(tc.b)) + require.Equal(t, tc.expect < 0, tc.a.Less(tc.b)) + require.Equal(t, tc.expect > 0, tc.b.Less(tc.a)) + + // Comparators on encoded keys should be identical. + aEnc, bEnc := EncodeMVCCKey(tc.a), EncodeMVCCKey(tc.b) + require.Equal(t, tc.expect, EngineKeyCompare(aEnc, bEnc)) + require.Equal(t, tc.expect == 0, EngineKeyEqual(aEnc, bEnc)) }) } } +func TestMVCCKeyCompareRandom(t *testing.T) { + defer leaktest.AfterTest(t)() + + f := func(aGen, bGen randMVCCKey) bool { + a, b := MVCCKey(aGen), MVCCKey(bGen) + aEnc, bEnc := EncodeMVCCKey(a), EncodeMVCCKey(b) + + cmp := a.Compare(b) + cmpEnc := EngineKeyCompare(aEnc, bEnc) + eq := a.Equal(b) + eqEnc := EngineKeyEqual(aEnc, bEnc) + lessAB := a.Less(b) + lessBA := b.Less(a) + + if cmp != cmpEnc { + t.Logf("cmp (%v) != cmpEnc (%v)", cmp, cmpEnc) + return false + } + if eq != eqEnc { + t.Logf("eq (%v) != eqEnc (%v)", eq, eqEnc) + return false + } + if (cmp == 0) != eq { + t.Logf("(cmp == 0) (%v) != eq (%v)", cmp == 0, eq) + return false + } + if (cmp < 0) != lessAB { + t.Logf("(cmp < 0) (%v) != lessAB (%v)", cmp < 0, lessAB) + return false + } + if (cmp > 0) != lessBA { + t.Logf("(cmp > 0) (%v) != lessBA (%v)", cmp > 0, lessBA) + return false + } + return true + } + require.NoError(t, quick.Check(f, nil)) +} + +// randMVCCKey is a quick.Generator for MVCCKey. +type randMVCCKey MVCCKey + +func (k randMVCCKey) Generate(r *rand.Rand, size int) reflect.Value { + k.Key = []byte([...]string{"a", "b", "c"}[r.Intn(3)]) + k.Timestamp.WallTime = r.Int63n(5) + k.Timestamp.Logical = r.Int31n(5) + if !k.Timestamp.IsEmpty() { + // NB: the zero timestamp cannot be synthetic. + k.Timestamp.Synthetic = r.Intn(2) != 0 + } + return reflect.ValueOf(k) +} + func TestEncodeDecodeMVCCKeyAndTimestampWithLength(t *testing.T) { defer leaktest.AfterTest(t)() @@ -113,7 +178,6 @@ func TestEncodeDecodeMVCCKeyAndTimestampWithLength(t *testing.T) { "all": {"foo", hlc.Timestamp{WallTime: 1643550788737652545, Logical: 65535, Synthetic: true}, "666f6f0016cf10bc050557410000ffff010e"}, } for name, tc := range testcases { - tc := tc t.Run(name, func(t *testing.T) { // Test Encode/DecodeMVCCKey. @@ -172,6 +236,46 @@ func TestEncodeDecodeMVCCKeyAndTimestampWithLength(t *testing.T) { } } +func TestDecodeUnnormalizedMVCCKey(t *testing.T) { + defer leaktest.AfterTest(t)() + + testcases := map[string]struct { + encoded string // hex-encoded + expected MVCCKey + equalToNormal bool + }{ + "zero logical": { + encoded: "666f6f0016cf10bc05055741000000000d", + expected: MVCCKey{Key: []byte("foo"), Timestamp: hlc.Timestamp{WallTime: 1643550788737652545, Logical: 0}}, + equalToNormal: true, + }, + "zero walltime and logical": { + encoded: "666f6f000000000000000000000000000d", + expected: MVCCKey{Key: []byte("foo"), Timestamp: hlc.Timestamp{WallTime: 0, Logical: 0}}, + // We could normalize this form in EngineKeyEqual and EngineKeyCompare, + // but doing so is not worth losing the fast-path byte comparison between + // keys that only contain (at most) a walltime. + equalToNormal: false, + }, + } + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + encoded, err := hex.DecodeString(tc.encoded) + require.NoError(t, err) + + decoded, err := DecodeMVCCKey(encoded) + require.NoError(t, err) + require.Equal(t, tc.expected, decoded) + + // Re-encode the key into its normal form. + reencoded := EncodeMVCCKey(decoded) + require.NotEqual(t, encoded, reencoded) + require.Equal(t, tc.equalToNormal, EngineKeyEqual(encoded, reencoded)) + require.Equal(t, tc.equalToNormal, EngineKeyCompare(encoded, reencoded) == 0) + }) + } +} + func TestDecodeMVCCKeyErrors(t *testing.T) { defer leaktest.AfterTest(t)() @@ -185,7 +289,6 @@ func TestDecodeMVCCKeyErrors(t *testing.T) { "invalid timestamp length suffix": {"ab00ffffffffffffffff0f", "invalid encoded mvcc key: ab00ffffffffffffffff0f"}, } for name, tc := range testcases { - tc := tc t.Run(name, func(t *testing.T) { encoded, err := hex.DecodeString(tc.encoded) require.NoError(t, err) @@ -208,7 +311,6 @@ func TestDecodeMVCCTimestampSuffixErrors(t *testing.T) { "invalid length suffix": {"ffffffffffffffff0f", "bad timestamp: found length suffix 15, actual length 9"}, } for name, tc := range testcases { - tc := tc t.Run(name, func(t *testing.T) { encoded, err := hex.DecodeString(tc.encoded) require.NoError(t, err) diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 2457a6798697..ccc54793d8ce 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -117,17 +117,106 @@ func EngineKeyCompare(a, b []byte) int { // Compare the version part of the key. Note that when the version is a // timestamp, the timestamp encoding causes byte comparison to be equivalent // to timestamp comparison. - aTS := a[aSep:aEnd] - bTS := b[bSep:bEnd] - if len(aTS) == 0 { - if len(bTS) == 0 { + aVer := a[aSep:aEnd] + bVer := b[bSep:bEnd] + if len(aVer) == 0 { + if len(bVer) == 0 { return 0 } return -1 - } else if len(bTS) == 0 { + } else if len(bVer) == 0 { return 1 } - return bytes.Compare(bTS, aTS) + aVer = normalizeEngineKeyVersionForCompare(aVer) + bVer = normalizeEngineKeyVersionForCompare(bVer) + return bytes.Compare(bVer, aVer) +} + +// EngineKeyEqual checks for equality of cockroach keys, including the version +// (which could be MVCC timestamps). +func EngineKeyEqual(a, b []byte) bool { + // NB: For performance, this routine manually splits the key into the + // user-key and version components rather than using DecodeEngineKey. In + // most situations, use DecodeEngineKey or GetKeyPartFromEngineKey or + // SplitMVCCKey instead of doing this. + aEnd := len(a) - 1 + bEnd := len(b) - 1 + if aEnd < 0 || bEnd < 0 { + // This should never happen unless there is some sort of corruption of + // the keys. + return bytes.Equal(a, b) + } + + // Last byte is the version length + 1 when there is a version, + // else it is 0. + aVerLen := int(a[aEnd]) + bVerLen := int(b[bEnd]) + + // Fast-path. If the key version is empty or contains only a walltime + // component then normalizeEngineKeyVersionForCompare is a no-op, so we don't + // need to split the "user key" from the version suffix before comparing to + // compute equality. Instead, we can check for byte equality immediately. + const withWall = mvccEncodedTimeSentinelLen + mvccEncodedTimeWallLen + if aVerLen <= withWall && bVerLen <= withWall { + return bytes.Equal(a, b) + } + + // Compute the index of the separator between the key and the version. If the + // separator is found to be at -1 for both keys, then we are comparing bare + // suffixes without a user key part. Pebble requires bare suffixes to be + // comparable with the same ordering as if they had a common user key. + aSep := aEnd - aVerLen + bSep := bEnd - bVerLen + if aSep == -1 && bSep == -1 { + aSep, bSep = 0, 0 // comparing bare suffixes + } + if aSep < 0 || bSep < 0 { + // This should never happen unless there is some sort of corruption of + // the keys. + return bytes.Equal(a, b) + } + + // Compare the "user key" part of the key. + if !bytes.Equal(a[:aSep], b[:bSep]) { + return false + } + + // Compare the version part of the key. + aVer := a[aSep:aEnd] + bVer := b[bSep:bEnd] + aVer = normalizeEngineKeyVersionForCompare(aVer) + bVer = normalizeEngineKeyVersionForCompare(bVer) + return bytes.Equal(aVer, bVer) +} + +var zeroLogical [mvccEncodedTimeLogicalLen]byte + +//gcassert:inline +func normalizeEngineKeyVersionForCompare(a []byte) []byte { + // In general, the version could also be a non-timestamp version, but we know + // that engineKeyVersionLockTableLen+mvccEncodedTimeSentinelLen is a different + // constant than the above, so there is no danger here of stripping parts from + // a non-timestamp version. + const withWall = mvccEncodedTimeSentinelLen + mvccEncodedTimeWallLen + const withLogical = withWall + mvccEncodedTimeLogicalLen + const withSynthetic = withLogical + mvccEncodedTimeSyntheticLen + if len(a) == withSynthetic { + // Strip the synthetic bit component from the timestamp version. The + // presence of the synthetic bit does not affect key ordering or equality. + a = a[:withLogical] + } + if len(a) == withLogical { + // If the timestamp version contains a logical timestamp component that is + // zero, strip the component. encodeMVCCTimestampToBuf will typically omit + // the entire logical component in these cases as an optimization, but it + // does not guarantee to never include a zero logical component. + // Additionally, we can fall into this case after stripping off other + // components of the key version earlier on in this function. + if bytes.Equal(a[withWall:], zeroLogical[:]) { + a = a[:withWall] + } + } + return a } // EngineComparer is a pebble.Comparer object that implements MVCC-specific @@ -135,6 +224,8 @@ func EngineKeyCompare(a, b []byte) int { var EngineComparer = &pebble.Comparer{ Compare: EngineKeyCompare, + Equal: EngineKeyEqual, + AbbreviatedKey: func(k []byte) uint64 { key, ok := GetKeyPartFromEngineKey(k) if !ok { diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index b9f714db37f3..b5a4ecb12e09 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -28,11 +29,14 @@ import ( "github.com/cockroachdb/pebble" ) -const ( - maxItersBeforeSeek = 10 +// Key value lengths take up 8 bytes (2 x Uint32). +const kvLenSize = 8 - // Key value lengths take up 8 bytes (2 x Uint32). - kvLenSize = 8 +var maxItersBeforeSeek = util.ConstantWithMetamorphicTestRange( + "mvcc-max-iters-before-seek", + 10, /* defaultValue */ + 0, /* min */ + 3, /* max */ ) // Struct to store MVCCScan / MVCCGet in the same binary format as that @@ -356,7 +360,7 @@ type pebbleMVCCScanner struct { // Stores any error returned. If non-nil, iteration short circuits. err error // Number of iterations to try before we do a Seek/SeekReverse. Stays within - // [1, maxItersBeforeSeek] and defaults to maxItersBeforeSeek/2 . + // [0, maxItersBeforeSeek] and defaults to maxItersBeforeSeek/2 . itersBeforeSeek int } @@ -483,8 +487,8 @@ func (p *pebbleMVCCScanner) incrementItersBeforeSeek() { // Decrements itersBeforeSeek while ensuring it stays positive. func (p *pebbleMVCCScanner) decrementItersBeforeSeek() { p.itersBeforeSeek-- - if p.itersBeforeSeek < 1 { - p.itersBeforeSeek = 1 + if p.itersBeforeSeek < 0 { + p.itersBeforeSeek = 0 } } @@ -971,6 +975,13 @@ func (p *pebbleMVCCScanner) addAndAdvance( func (p *pebbleMVCCScanner) seekVersion( ctx context.Context, seekTS hlc.Timestamp, uncertaintyCheck bool, ) bool { + if seekTS.IsEmpty() { + // If the seek timestamp is empty, we've already seen all versions of this + // key, so seek to the next key. Seeking to version zero of the current key + // would be incorrect, as version zero is stored before all other versions. + return p.advanceKey() + } + seekKey := MVCCKey{Key: p.curUnsafeKey.Key, Timestamp: seekTS} p.keyBuf = EncodeMVCCKeyToBuf(p.keyBuf[:0], seekKey) origKey := p.keyBuf[:len(p.curUnsafeKey.Key)] diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index d087eb1715fe..44409107043d 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -14,7 +14,6 @@ import ( "bytes" "context" "fmt" - "io/ioutil" "math" "math/rand" "path/filepath" @@ -602,26 +601,42 @@ func TestPebbleIterConsistency(t *testing.T) { } func BenchmarkMVCCKeyCompare(b *testing.B) { + keys := makeRandEncodedKeys() + b.ResetTimer() + for i, j := 0, 0; i < b.N; i, j = i+1, j+3 { + _ = EngineKeyCompare(keys[i%len(keys)], keys[j%len(keys)]) + } +} + +func BenchmarkMVCCKeyEqual(b *testing.B) { + keys := makeRandEncodedKeys() + b.ResetTimer() + for i, j := 0, 0; i < b.N; i, j = i+1, j+3 { + _ = EngineKeyEqual(keys[i%len(keys)], keys[j%len(keys)]) + } +} + +func makeRandEncodedKeys() [][]byte { rng := rand.New(rand.NewSource(timeutil.Now().Unix())) keys := make([][]byte, 1000) for i := range keys { k := MVCCKey{ - Key: randutil.RandBytes(rng, 8), + Key: []byte("shared" + [...]string{"a", "b", "c"}[rng.Intn(3)]), Timestamp: hlc.Timestamp{ - WallTime: int64(rng.Intn(5)), + WallTime: rng.Int63n(5), }, } + if rng.Int31n(5) == 0 { + // 20% of keys have a logical component. + k.Timestamp.Logical = rng.Int31n(4) + 1 + } + if rng.Int31n(1000) == 0 && !k.Timestamp.IsEmpty() { + // 0.1% of keys have a synthetic component. + k.Timestamp.Synthetic = true + } keys[i] = EncodeMVCCKey(k) } - - b.ResetTimer() - var c int - for i, j := 0, 0; i < b.N; i, j = i+1, j+3 { - c = EngineKeyCompare(keys[i%len(keys)], keys[j%len(keys)]) - } - if testing.Verbose() { - fmt.Fprint(ioutil.Discard, c) - } + return keys } type testValue struct { diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index adebc98eb49f..8d0e18c98659 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1996,6 +1996,7 @@ func TestLint(t *testing.T) { "../../sql/colfetcher", "../../sql/row", "../../kv/kvclient/rangecache", + "../../storage", ); err != nil { t.Fatal(err) }