From 2d4b60173a62ba7eab2ca0bd0a59bb5226b30ed1 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 28 Jul 2022 15:26:08 -0400 Subject: [PATCH] kv/bulk: write ImportEpoch to each MVCCValue during IMPORT This patch makes IMPORT INTO on a non-empty table write the table's ImportEpoch to each ingested MVCC Value, via the SSTBatcher. In a future PR, the ImportEpoch will be used rollback an IMPORT INTO in some cases. This additional information will allow IMPORTing tables to be backed up and restored. As part of this change we now also assume we might see an MVCCValue during restore. * Version Gating Previously, callers could (and did) assume that the values present in the SSTs returned by export request could be interpreted directly as roachpb.Value objects using code like: roachpb.Value{RawBytes: valBytes} For MVCCValueHeaders to be exported by ExportRequest all callers need to be updated: 1. ExportRequest on system.descriptors in sql/catalog/lease 2. ExportRequest on system.descriptors in ccl/changefeedccl/schemafeed 3. ExportRequest used by `FINGERPRINT` 4. ExportRequest used by old binaries in a mixed-version cluster. (1) and (2) will be easy to update and likely don't matter in practice moment as those tables do not include values with exportable value headers at the moment. (3) will be easy to update, but we still need an option to exclude value headers (a) until value headers are included in rangefeeds and (b) so long as we want to compare fingerprints with 23.2 versions. (4) is impossible to update so if we want BACKUP/RESTORE to round-trip in mixed version cluster we must version gate including them in backups until the cluster is on a single version. To account for this we only increment ImportEpoch during IMPORTs that start on 24.1 or greater and we only request MVCCValueHeaders on BACKUPs that start on 24.1 or greater. The first condition is important to ensure that we can later detect a table that can be fully rolled back using the new rollback method. Note that this also marks a hard backward incompatibility for backup artifacts. Backups for 24.1 cannot be restored on 23.2 or older. This was already the case by policy. 23.2 backups should still work fine on 24.1 since all roachpb.Value's should properly decode as MVCCValue's. Informs #76722 Release note: None Co-authored-by: Steven Danna --- pkg/ccl/backupccl/backup_job.go | 1 + pkg/ccl/backupccl/backup_processor.go | 14 +-- .../backupccl/backup_processor_planning.go | 45 +++---- pkg/ccl/backupccl/restore_data_processor.go | 15 ++- pkg/kv/bulk/buffering_adder.go | 20 ++- pkg/kv/bulk/sst_batcher.go | 18 ++- pkg/kv/bulk/sst_batcher_test.go | 77 +++++++++++- pkg/kv/kvpb/api.proto | 9 +- pkg/kv/kvserver/batcheval/cmd_export.go | 1 + pkg/kv/kvserver/kvserverbase/bulk_adder.go | 9 ++ pkg/sql/execinfrapb/processors_bulk_io.proto | 9 +- pkg/sql/importer/BUILD.bazel | 3 + pkg/sql/importer/import_job.go | 27 +++- pkg/sql/importer/import_mvcc_test.go | 116 ++++++++++++++++++ pkg/sql/importer/import_processor.go | 12 ++ pkg/sql/importer/import_processor_test.go | 6 +- pkg/storage/mvcc.go | 18 ++- pkg/storage/mvcc_value.go | 40 ++++-- pkg/storage/mvcc_value_test.go | 67 ++++++++++ 19 files changed, 451 insertions(+), 56 deletions(-) create mode 100644 pkg/sql/importer/import_mvcc_test.go diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index bb20648c59db..6ef7bc31db92 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -244,6 +244,7 @@ func backup( backupManifest.StartTime, backupManifest.EndTime, backupManifest.ElidedPrefix, + backupManifest.ClusterVersion.AtLeast(clusterversion.V24_1.Version()), ) if err != nil { return roachpb.RowCount{}, 0, err diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index f44c604b91a1..7eba8ba4dc70 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -487,14 +487,14 @@ func runBackupProcessor( if !span.firstKeyTS.IsEmpty() { splitMidKey = true } - req := &kvpb.ExportRequest{ - RequestHeader: kvpb.RequestHeaderFromSpan(span.span), - ResumeKeyTS: span.firstKeyTS, - StartTime: span.start, - MVCCFilter: spec.MVCCFilter, - TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), - SplitMidKey: splitMidKey, + RequestHeader: kvpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, + StartTime: span.start, + MVCCFilter: spec.MVCCFilter, + TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), + SplitMidKey: splitMidKey, + IncludeMVCCValueHeader: spec.IncludeMVCCValueHeader, } // If we're doing re-attempts but are not yet in the priority regime, diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 9edcccb37f59..91b48f9b005b 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -46,6 +46,7 @@ func distBackupPlanSpecs( mvccFilter kvpb.MVCCFilter, startTime, endTime hlc.Timestamp, elide execinfrapb.ElidePrefix, + includeValueHeader bool, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") @@ -98,17 +99,18 @@ func distBackupPlanSpecs( sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec) for _, partition := range spanPartitions { spec := &execinfrapb.BackupDataSpec{ - JobID: jobID, - Spans: partition.Spans, - DefaultURI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, - MVCCFilter: mvccFilter, - Encryption: fileEncryption, - PKIDs: pkIDs, - BackupStartTime: startTime, - BackupEndTime: endTime, - UserProto: user.EncodeProto(), - ElidePrefix: elide, + JobID: jobID, + Spans: partition.Spans, + DefaultURI: defaultURI, + URIsByLocalityKV: urisByLocalityKV, + MVCCFilter: mvccFilter, + Encryption: fileEncryption, + PKIDs: pkIDs, + BackupStartTime: startTime, + BackupEndTime: endTime, + UserProto: user.EncodeProto(), + ElidePrefix: elide, + IncludeMVCCValueHeader: includeValueHeader, } sqlInstanceIDToSpec[partition.SQLInstanceID] = spec } @@ -121,16 +123,17 @@ func distBackupPlanSpecs( // which is not the leaseholder for any of the spans, but is for an // introduced span. spec := &execinfrapb.BackupDataSpec{ - JobID: jobID, - IntroducedSpans: partition.Spans, - DefaultURI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, - MVCCFilter: mvccFilter, - Encryption: fileEncryption, - PKIDs: pkIDs, - BackupStartTime: startTime, - BackupEndTime: endTime, - UserProto: user.EncodeProto(), + JobID: jobID, + IntroducedSpans: partition.Spans, + DefaultURI: defaultURI, + URIsByLocalityKV: urisByLocalityKV, + MVCCFilter: mvccFilter, + Encryption: fileEncryption, + PKIDs: pkIDs, + BackupStartTime: startTime, + BackupEndTime: endTime, + UserProto: user.EncodeProto(), + IncludeMVCCValueHeader: includeValueHeader, } sqlInstanceIDToSpec[partition.SQLInstanceID] = spec } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index e2ecf7d3ea4e..baa7d7a19ffa 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( return summary, err } valueScratch = append(valueScratch[:0], v...) - value := roachpb.Value{RawBytes: valueScratch} + value, err := storage.DecodeValueFromMVCCValue(valueScratch) + if err != nil { + return summary, err + } key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime) @@ -581,7 +583,14 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( if verbose { log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint()) } - if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil { + + // Using valueScratch here assumes that + // DecodeValueFromMVCCValue, ClearChecksum, and + // InitChecksum don't copy/reallocate the slice they + // were given. We expect that value.ClearChecksum and + // value.InitChecksum calls above have modified + // valueScratch. + if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil { return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint()) } } diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 3de6446cdb60..ef585396b84a 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -63,6 +63,12 @@ type BufferingAdder struct { // name of the BufferingAdder for the purpose of logging only. name string + // importEpoch specifies the ImportEpoch of the table the BufferingAdder + // is ingesting data as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + importEpoch uint32 + bulkMon *mon.BytesMonitor memAcc mon.BoundAccount @@ -96,7 +102,8 @@ func MakeBulkAdder( } b := &BufferingAdder{ - name: opts.Name, + name: opts.Name, + importEpoch: opts.ImportEpoch, sink: SSTBatcher{ name: opts.Name, db: db, @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { for i := range b.curBuf.entries { mvccKey.Key = b.curBuf.Key(i) - if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { - return err + if b.importEpoch != 0 { + if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i), + b.importEpoch); err != nil { + return err + } + } else { + if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil { + return err + } } } if err := b.sink.Flush(ctx); err != nil { diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 51dd1dc4bb23..e19ec6b05a03 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { b.mu.onFlush = onFlush } +func (b *SSTBatcher) AddMVCCKeyWithImportEpoch( + ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32, +) error { + mvccVal, err := storage.DecodeMVCCValue(value) + if err != nil { + return err + } + mvccVal.MVCCValueHeader.ImportEpoch = importEpoch + encVal, err := storage.EncodeMVCCValue(mvccVal) + if err != nil { + return err + } + return b.AddMVCCKey(ctx, key, encVal) +} + // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). // This is only for callers that want to control the timestamp on individual // keys -- like RESTORE where we want the restored data to look like the backup. @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value if !b.disallowShadowingBelow.IsEmpty() { b.updateMVCCStats(key, value) } - - return b.sstWriter.Put(key, value) + return b.sstWriter.PutRawMVCC(key, value) } // Reset clears all state in the batcher and prepares it for reuse. diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 4912a4bbe3e3..85082fdfddd0 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -308,7 +308,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) { mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) b, err := bulk.MakeBulkAdder( - ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, + ctx, kvDB, mockCache, s.ClusterSettings(), ts, + kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs, ) require.NoError(t, err) @@ -361,3 +362,77 @@ func runTestImport(t *testing.T, batchSizeValue int64) { }) } } + +var DummyImportEpoch uint32 = 3 + +func TestImportEpochIngestion(t *testing.T) { + defer leaktest.AfterTest(t)() + + defer log.Scope(t).Close(t) + ctx := context.Background() + + mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) + reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000) + s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(), + false, true, mem.MakeConcurrentBoundAccount(), reqs) + require.NoError(t, err) + defer b.Close(ctx) + + startKey := storageutils.PointKey("a", 1) + endKey := storageutils.PointKey("b", 1) + value := storageutils.StringValueRaw("myHumbleValue") + mvccValue, err := storage.DecodeMVCCValue(value) + require.NoError(t, err) + + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, startKey, value, DummyImportEpoch)) + require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, endKey, value, DummyImportEpoch)) + require.NoError(t, b.Flush(ctx)) + + // Check that ingested key contains the dummy job ID + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey.Key, + EndKey: endKey.Key, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + IncludeMVCCValueHeader: true, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + kvDB.NonTransactionalSender(), header, req) + require.NoError(t, roachErr.GoError()) + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey.Key, + UpperBound: endKey.Key, + } + + checkedJobId := false + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + require.Equal(t, startKey, it.UnsafeKey()) + require.Equal(t, mvccValue.Value, val.Value) + require.Equal(t, DummyImportEpoch, val.ImportEpoch) + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + checkedJobId = true + } + } + require.Equal(t, true, checkedJobId) +} diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 75a5b8031ede..4fcedd9f5dab 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -1831,7 +1831,14 @@ message ExportRequest { FingerprintOptions fingerprint_options = 15 [(gogoproto.nullable) = false]; - // Next ID: 16 + // IncludeMVCCValueHeader controls whether the MVCCValueHeader is + // included in exported bytes. Callers should only set this when all + // readers of the returned SST are prepared to parse full a + // MVCCValue. Even when set, only fields appropriate for export are + // included. See storage.EncodeMVCCValueForExport for details. + bool include_mvcc_value_header = 16 [(gogoproto.customname) = "IncludeMVCCValueHeader"]; + + // Next ID: 17 } message FingerprintOptions { diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 6a1705004a6d..dea1d60e7487 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -197,6 +197,7 @@ func evalExport( TargetLockConflictBytes: targetLockConflictBytes, StopMidKey: args.SplitMidKey, ScanStats: cArgs.ScanStats, + IncludeMVCCValueHeader: args.IncludeMVCCValueHeader, } var summary kvpb.BulkOpSummary var resumeInfo storage.ExportRequestResumeInfo diff --git a/pkg/kv/kvserver/kvserverbase/bulk_adder.go b/pkg/kv/kvserver/kvserverbase/bulk_adder.go index 62323091a093..601eb1a3bc26 100644 --- a/pkg/kv/kvserver/kvserverbase/bulk_adder.go +++ b/pkg/kv/kvserver/kvserverbase/bulk_adder.go @@ -69,6 +69,15 @@ type BulkAdderOptions struct { // the first buffer to pick split points in the hope it is a representative // sample of the overall input. InitialSplitsIfUnordered int + + // ImportEpoch specifies the ImportEpoch of the table the BulkAdder + // is ingesting data into as part of an IMPORT INTO job. If specified, the Bulk + // Adder's SSTBatcher will write the import epoch to each versioned value's + // metadata. + // + // Callers should check that the cluster is at or above + // version 24.1 before setting this option. + ImportEpoch uint32 } // BulkAdderFactory describes a factory function for BulkAdders. diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 5073bc3d99ce..ad8eb0bd48c6 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -318,7 +318,14 @@ message BackupDataSpec { optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; optional ElidePrefix elide_prefix = 12 [(gogoproto.nullable) = false]; - // NEXTID: 13. + + // IncludeMVCCValueHeader indicates whether the backup should be + // created with MVCCValueHeaders in the exported data. This should + // only be set on backups starting on cluster version 24.1 or + // greater. + optional bool include_mvcc_value_header = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IncludeMVCCValueHeader"]; + + // NEXTID: 14. } message RestoreFileSpec { diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 323e5d8acb2e..6a8008f0e59a 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -148,6 +148,7 @@ go_test( "exportparquet_test.go", "import_csv_mark_redaction_test.go", "import_into_test.go", + "import_mvcc_test.go", "import_processor_test.go", "import_stmt_test.go", "main_test.go", @@ -187,6 +188,7 @@ go_test( "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", @@ -229,6 +231,7 @@ go_test( "//pkg/sql/stats", "//pkg/sql/tests", "//pkg/sql/types", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/jobutils", diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 30fedbe7936a..f7e8cda2259c 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -292,6 +292,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + if len(details.Tables) > 1 { + for _, tab := range details.Tables { + if !tab.IsNew { + return errors.AssertionFailedf("all tables in multi-table import must be new") + } + } + } + procsPerNode := int(processorsPerNode.Get(&p.ExecCfg().Settings.SV)) res, err := ingestWithRetry(ctx, p, r.job, tables, typeDescs, files, format, details.Walltime, @@ -400,9 +408,11 @@ func (r *importResumer) prepareTablesForIngestion( var err error var newTableDescs []jobspb.ImportDetails_Table var desc *descpb.TableDescriptor + + useImportEpochs := p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V24_1) for i, table := range details.Tables { if !table.IsNew { - desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc) + desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc, useImportEpochs) if err != nil { return importDetails, err } @@ -480,7 +490,11 @@ func (r *importResumer) prepareTablesForIngestion( // prepareExistingTablesForIngestion prepares descriptors for existing tables // being imported into. func prepareExistingTablesForIngestion( - ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, desc *descpb.TableDescriptor, + ctx context.Context, + txn *kv.Txn, + descsCol *descs.Collection, + desc *descpb.TableDescriptor, + useImportEpochs bool, ) (*descpb.TableDescriptor, error) { if len(desc.Mutations) > 0 { return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String()) @@ -500,7 +514,14 @@ func prepareExistingTablesForIngestion( // Take the table offline for import. // TODO(dt): audit everywhere we get table descs (leases or otherwise) to // ensure that filtering by state handles IMPORTING correctly. - importing.OfflineForImport() + + // We only use the new OfflineForImport on 24.1, which bumps + // the ImportEpoch, if we are completely on 24.1. + if useImportEpochs { + importing.OfflineForImport() + } else { + importing.SetOffline(tabledesc.OfflineReasonImporting) + } // TODO(dt): de-validate all the FKs. if err := descsCol.WriteDesc( diff --git a/pkg/sql/importer/import_mvcc_test.go b/pkg/sql/importer/import_mvcc_test.go new file mode 100644 index 000000000000..f1225b10576a --- /dev/null +++ b/pkg/sql/importer/import_mvcc_test.go @@ -0,0 +1,116 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package importer_test + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestMVCCValueHeaderImportEpoch tests that the import job ID is properly +// stored in the MVCCValueHeader in an imported key's MVCCValue. +func TestMVCCValueHeaderImportEpoch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + server, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := server.ApplicationLayer() + defer server.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(db) + + sqlDB.Exec(t, `CREATE DATABASE d`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + fmt.Fprint(w, "1") + } + })) + defer srv.Close() + + // Create a table where the first row ( in sort order) comes from an IMPORT + // while the second comes from an INSERT. + sqlDB.Exec(t, `CREATE TABLE d.t (a INT8)`) + sqlDB.Exec(t, `INSERT INTO d.t VALUES ('2')`) + sqlDB.Exec(t, `IMPORT INTO d.t CSV DATA ($1)`, srv.URL) + + // Conduct an export request to iterate over the keys in the table. + var tableID uint32 + sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, + "t").Scan(&tableID) + + startKey := s.Codec().TablePrefix(tableID) + endKey := startKey.PrefixEnd() + + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: startKey, + EndKey: endKey, + }, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + IncludeMVCCValueHeader: true, + } + + header := kvpb.Header{Timestamp: s.Clock().Now()} + resp, roachErr := kv.SendWrappedWith(ctx, + s.DistSenderI().(*kvcoord.DistSender), header, req) + require.NoError(t, roachErr.GoError()) + + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: startKey, + UpperBound: endKey, + } + + // Ensure there are 2 keys in the span, and only the first one contains job ID metadata + keyCount := 0 + for _, file := range resp.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts) + require.NoError(t, err) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + rawVal, err := it.UnsafeValue() + require.NoError(t, err) + val, err := storage.DecodeMVCCValue(rawVal) + require.NoError(t, err) + if keyCount == 0 { + require.NotEqual(t, uint32(0), val.ImportEpoch) + } else if keyCount == 1 { + require.Equal(t, uint32(0), val.ImportEpoch) + } else { + t.Fatal("more than 2 keys in the table") + } + require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp) + keyCount++ + } + } +} diff --git a/pkg/sql/importer/import_processor.go b/pkg/sql/importer/import_processor.go index ef79797eac16..f49441738cad 100644 --- a/pkg/sql/importer/import_processor.go +++ b/pkg/sql/importer/import_processor.go @@ -389,6 +389,16 @@ func ingestKvs( // will hog memory as it tries to grow more aggressively. minBufferSize, maxBufferSize := importBufferConfigSizes(flowCtx.Cfg.Settings, true /* isPKAdder */) + + var bulkAdderImportEpoch uint32 + for _, v := range spec.Tables { + if bulkAdderImportEpoch == 0 { + bulkAdderImportEpoch = v.Desc.ImportEpoch + } else if bulkAdderImportEpoch != v.Desc.ImportEpoch { + return nil, errors.AssertionFailedf("inconsistent import epoch on multi-table import") + } + } + pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB.KV(), writeTS, kvserverbase.BulkAdderOptions{ Name: pkAdderName, DisallowShadowingBelow: writeTS, @@ -397,6 +407,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err @@ -413,6 +424,7 @@ func ingestKvs( MaxBufferSize: maxBufferSize, InitialSplitsIfUnordered: int(spec.InitialSplits), WriteAtBatchTimestamp: true, + ImportEpoch: bulkAdderImportEpoch, }) if err != nil { return nil, err diff --git a/pkg/sql/importer/import_processor_test.go b/pkg/sql/importer/import_processor_test.go index 3a959327ae6f..a1ecc42be252 100644 --- a/pkg/sql/importer/import_processor_test.go +++ b/pkg/sql/importer/import_processor_test.go @@ -243,7 +243,7 @@ func TestImportIgnoresProcessedFiles(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -368,7 +368,7 @@ func TestImportHonorsResumePosition(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( @@ -502,7 +502,7 @@ func TestImportHandlesDuplicateKVs(t *testing.T) { Mon: evalCtx.TestingMon, Cfg: &execinfra.ServerConfig{ JobRegistry: &jobs.Registry{}, - Settings: &cluster.Settings{}, + Settings: cluster.MakeTestingClusterSettings(), ExternalStorage: externalStorageFactory, DB: fakeDB{}, BulkAdder: func( diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 5ecf5fa6b705..ab63da1d7116 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -7884,9 +7884,13 @@ func mvccExportToWriter( return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) } - unsafeValue, err = EncodeMVCCValueForExport(mvccValue) - if err != nil { - return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) + if opts.IncludeMVCCValueHeader { + unsafeValue, err = EncodeMVCCValueForExport(mvccValue) + if err != nil { + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) + } + } else { + unsafeValue = mvccValue.Value.RawBytes } // Skip tombstone records when start time is zero (non-incremental) // and we are not exporting all versions. @@ -8053,6 +8057,14 @@ type MVCCExportOptions struct { // FingerprintOptions controls how fingerprints are generated // when using MVCCExportFingerprint. FingerprintOptions MVCCExportFingerprintOptions + + // IncludeMVCCValueHeader controls whether we include + // MVCCValueHeaders in the exported data. When true, the + // portions of the header appropriate for export are included + // in the encoded values. Callers should be ready to decode + // full MVCCValue's in this case. + IncludeMVCCValueHeader bool + // ScanStats, if set, is updated with iterator stats upon export success of // failure. Non-iterator stats i.e., {NumGets,NumReverseScans} are left // unchanged, and NumScans is incremented by 1. diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 27bb99fad239..2b8ea7f426f4 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -134,19 +134,17 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) { w.Print(v.Value.PrettyPrint()) } -// EncodeMVCCValueForExport strips fields from the MVCCValueHeader that -// should not get exported out of the cluster. -// -//gcassert:inline +// EncodeMVCCValueForExport encodes fields from the MVCCValueHeader +// that are appropriate for export out of the cluster. func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) { - // Consider a fast path, where only the roachpb.Value gets exported. - // Currently, this only occurs if the value was not imported. if mvccValue.ImportEpoch == 0 { return mvccValue.Value.RawBytes, nil } - // Manually strip off any non-exportable fields, and re-encode the mvcc value. - mvccValue.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{} + // We only export ImportEpoch. + mvccValue.MVCCValueHeader = enginepb.MVCCValueHeader{ + ImportEpoch: mvccValue.ImportEpoch, + } return EncodeMVCCValue(mvccValue) } @@ -234,6 +232,32 @@ func DecodeMVCCValue(buf []byte) (MVCCValue, error) { return decodeExtendedMVCCValue(buf) } +// DecodeValueFromMVCCValue decodes and MVCCValue and returns the +// roachpb.Value portion without parsing the MVCCValueHeader. +// +// NB: Caller assumes that this function does not copy or re-allocate +// the underlying byte slice. +func DecodeValueFromMVCCValue(buf []byte) (roachpb.Value, error) { + if len(buf) == 0 { + // Tombstone with no header. + return roachpb.Value{}, nil + } + if len(buf) <= tagPos { + return roachpb.Value{}, errMVCCValueMissingTag + } + if buf[tagPos] != extendedEncodingSentinel { + return roachpb.Value{RawBytes: buf}, nil + } + + // Extended encoding + headerLen := binary.BigEndian.Uint32(buf) + headerSize := extendedPreludeSize + headerLen + if len(buf) < int(headerSize) { + return roachpb.Value{}, errMVCCValueMissingHeader + } + return roachpb.Value{RawBytes: buf[headerSize:]}, nil +} + // DecodeMVCCValueAndErr is a helper that can be called using the ([]byte, // error) pair returned from the iterator UnsafeValue(), Value() methods. func DecodeMVCCValueAndErr(buf []byte, err error) (MVCCValue, error) { diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index 6a2084390a3b..b5bfd6e4151a 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -211,6 +211,24 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { return buf.String() })) + t.Run("DeocdeValueFromMVCCValue/"+name, func(t *testing.T) { + enc, err := EncodeMVCCValue(tc.val) + require.NoError(t, err) + assert.Equal(t, encodedMVCCValueSize(tc.val), len(enc)) + + dec, err := DecodeValueFromMVCCValue(enc) + require.NoError(t, err) + + if len(dec.RawBytes) == 0 { + dec.RawBytes = nil // normalize + } + + require.Equal(t, tc.val.Value, dec) + require.Equal(t, tc.val.IsTombstone(), len(dec.RawBytes) == 0) + isTombstone, err := EncodedMVCCValueIsTombstone(enc) + require.NoError(t, err) + require.Equal(t, tc.val.IsTombstone(), isTombstone) + }) } } @@ -233,6 +251,14 @@ func TestDecodeMVCCValueErrors(t *testing.T) { require.Equal(t, tc.expect, err) require.False(t, isTombstone) }) + t.Run("DecodeValueFromMVCCValue/"+name, func(t *testing.T) { + dec, err := DecodeValueFromMVCCValue(tc.enc) + require.Equal(t, tc.expect, err) + require.Zero(t, dec) + isTombstone, err := EncodedMVCCValueIsTombstone(tc.enc) + require.Equal(t, tc.expect, err) + require.False(t, isTombstone) + }) } } @@ -283,6 +309,26 @@ func BenchmarkEncodeMVCCValue(b *testing.B) { } } +func BenchmarkEncodeMVCCValueForExport(b *testing.B) { + DisableMetamorphicSimpleValueEncoding(b) + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, err := EncodeMVCCValueForExport(mvccValue) + if err != nil { // for performance + require.NoError(b, err) + } + _ = res + } + }) + } + } +} + func BenchmarkDecodeMVCCValue(b *testing.B) { headers, values := mvccValueBenchmarkConfigs() for hDesc, h := range headers { @@ -316,6 +362,27 @@ func BenchmarkDecodeMVCCValue(b *testing.B) { } } +func BenchmarkDecodeValueFromMVCCValue(b *testing.B) { + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + buf, err := EncodeMVCCValue(mvccValue) + require.NoError(b, err) + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, err := DecodeValueFromMVCCValue(buf) + if err != nil { // for performance + require.NoError(b, err) + } + _ = res + } + }) + } + } +} + func BenchmarkMVCCValueIsTombstone(b *testing.B) { headers, values := mvccValueBenchmarkConfigs() for hDesc, h := range headers {