Skip to content

Commit

Permalink
kv/bulk: write ImportEpoch to each MVCCValue during IMPORT
Browse files Browse the repository at this point in the history
This patch makes IMPORT INTO on a non-empty table write the table's
ImportEpoch to each ingested MVCC Value, via the SSTBatcher. In a
future PR, the ImportEpoch will be used rollback an IMPORT INTO in
some cases.

This additional information will allow IMPORTing tables to be backed
up and restored.

As part of this change we now also assume we might see an MVCCValue
during restore.

* Version Gating

Previously, callers could (and did) assume that the values present in
the SSTs returned by export request could be interpreted directly as
roachpb.Value objects using code like:

    roachpb.Value{RawBytes: valBytes}

For MVCCValueHeaders to be exported by ExportRequest all callers need
to be updated:

1. ExportRequest on system.descriptors in sql/catalog/lease
2. ExportRequest on system.descriptors in ccl/changefeedccl/schemafeed
3. ExportRequest used by `FINGERPRINT`
4. ExportRequest used by old binaries in a mixed-version cluster.

(1) and (2) will be easy to update and likely don't matter in practice
moment as those tables do not include values with exportable value
headers at the moment.

(3) will be easy to update, but we still need an option to exclude
value headers (a) until value headers are included in rangefeeds
and (b) so long as we want to compare fingerprints with 23.2 versions.

(4) is impossible to update so if we want BACKUP/RESTORE to round-trip
in mixed version cluster we must version gate including them in
backups until the cluster is on a single version.

To account for this we only increment ImportEpoch during IMPORTs that
start on 24.1 or greater and we only request MVCCValueHeaders on
BACKUPs that start on 24.1 or greater.

The first condition is important to ensure that we can later detect a
table that can be fully rolled back using the new rollback method.

Note that this also marks a hard backward incompatibility for backup
artifacts. Backups for 24.1 cannot be restored on 23.2 or older. This
was already the case by policy. 23.2 backups should still work fine on
24.1 since all roachpb.Value's should properly decode as MVCCValue's.

Informs #76722

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
msbutler and stevendanna committed Mar 12, 2024
1 parent c097c19 commit 2d4b601
Show file tree
Hide file tree
Showing 19 changed files with 451 additions and 56 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func backup(
backupManifest.StartTime,
backupManifest.EndTime,
backupManifest.ElidedPrefix,
backupManifest.ClusterVersion.AtLeast(clusterversion.V24_1.Version()),
)
if err != nil {
return roachpb.RowCount{}, 0, err
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,14 @@ func runBackupProcessor(
if !span.firstKeyTS.IsEmpty() {
splitMidKey = true
}

req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
SplitMidKey: splitMidKey,
RequestHeader: kvpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
SplitMidKey: splitMidKey,
IncludeMVCCValueHeader: spec.IncludeMVCCValueHeader,
}

// If we're doing re-attempts but are not yet in the priority regime,
Expand Down
45 changes: 24 additions & 21 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func distBackupPlanSpecs(
mvccFilter kvpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
elide execinfrapb.ElidePrefix,
includeValueHeader bool,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
Expand Down Expand Up @@ -98,17 +99,18 @@ func distBackupPlanSpecs(
sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec)
for _, partition := range spanPartitions {
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand All @@ -121,16 +123,17 @@ func distBackupPlanSpecs(
// which is not the leaseholder for any of the spans, but is for an
// introduced span.
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
return summary, err
}
valueScratch = append(valueScratch[:0], v...)
value := roachpb.Value{RawBytes: valueScratch}
value, err := storage.DecodeValueFromMVCCValue(valueScratch)
if err != nil {
return summary, err
}

key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime)

Expand All @@ -581,7 +583,14 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
if verbose {
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}
if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil {

// Using valueScratch here assumes that
// DecodeValueFromMVCCValue, ClearChecksum, and
// InitChecksum don't copy/reallocate the slice they
// were given. We expect that value.ClearChecksum and
// value.InitChecksum calls above have modified
// valueScratch.
if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil {
return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint())
}
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type BufferingAdder struct {
// name of the BufferingAdder for the purpose of logging only.
name string

// importEpoch specifies the ImportEpoch of the table the BufferingAdder
// is ingesting data as part of an IMPORT INTO job. If specified, the Bulk
// Adder's SSTBatcher will write the import epoch to each versioned value's
// metadata.
importEpoch uint32

bulkMon *mon.BytesMonitor
memAcc mon.BoundAccount

Expand Down Expand Up @@ -96,7 +102,8 @@ func MakeBulkAdder(
}

b := &BufferingAdder{
name: opts.Name,
name: opts.Name,
importEpoch: opts.ImportEpoch,
sink: SSTBatcher{
name: opts.Name,
db: db,
Expand Down Expand Up @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

for i := range b.curBuf.entries {
mvccKey.Key = b.curBuf.Key(i)
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
if b.importEpoch != 0 {
if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i),
b.importEpoch); err != nil {
return err
}
} else {
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
}
}
}
if err := b.sink.Flush(ctx); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) {
b.mu.onFlush = onFlush
}

func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32,
) error {
mvccVal, err := storage.DecodeMVCCValue(value)
if err != nil {
return err
}
mvccVal.MVCCValueHeader.ImportEpoch = importEpoch
encVal, err := storage.EncodeMVCCValue(mvccVal)
if err != nil {
return err
}
return b.AddMVCCKey(ctx, key, encVal)
}

// AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed).
// This is only for callers that want to control the timestamp on individual
// keys -- like RESTORE where we want the restored data to look like the backup.
Expand Down Expand Up @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
if !b.disallowShadowingBelow.IsEmpty() {
b.updateMVCCStats(key, value)
}

return b.sstWriter.Put(key, value)
return b.sstWriter.PutRawMVCC(key, value)
}

// Reset clears all state in the batcher and prepares it for reuse.
Expand Down
77 changes: 76 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
ctx, kvDB, mockCache, s.ClusterSettings(), ts,
kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
)
require.NoError(t, err)

Expand Down Expand Up @@ -361,3 +362,77 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
})
}
}

var DummyImportEpoch uint32 = 3

func TestImportEpochIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()

defer log.Scope(t).Close(t)
ctx := context.Background()

mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(),
false, true, mem.MakeConcurrentBoundAccount(), reqs)
require.NoError(t, err)
defer b.Close(ctx)

startKey := storageutils.PointKey("a", 1)
endKey := storageutils.PointKey("b", 1)
value := storageutils.StringValueRaw("myHumbleValue")
mvccValue, err := storage.DecodeMVCCValue(value)
require.NoError(t, err)

require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, startKey, value, DummyImportEpoch))
require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, endKey, value, DummyImportEpoch))
require.NoError(t, b.Flush(ctx))

// Check that ingested key contains the dummy job ID
req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{
Key: startKey.Key,
EndKey: endKey.Key,
},
MVCCFilter: kvpb.MVCCFilter_All,
StartTime: hlc.Timestamp{},
IncludeMVCCValueHeader: true,
}

header := kvpb.Header{Timestamp: s.Clock().Now()}
resp, roachErr := kv.SendWrappedWith(ctx,
kvDB.NonTransactionalSender(), header, req)
require.NoError(t, roachErr.GoError())
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: startKey.Key,
UpperBound: endKey.Key,
}

checkedJobId := false
for _, file := range resp.(*kvpb.ExportResponse).Files {
it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts)
require.NoError(t, err)
defer it.Close()
for it.SeekGE(storage.NilKey); ; it.Next() {
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
break
}
rawVal, err := it.UnsafeValue()
require.NoError(t, err)
val, err := storage.DecodeMVCCValue(rawVal)
require.NoError(t, err)
require.Equal(t, startKey, it.UnsafeKey())
require.Equal(t, mvccValue.Value, val.Value)
require.Equal(t, DummyImportEpoch, val.ImportEpoch)
require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp)
checkedJobId = true
}
}
require.Equal(t, true, checkedJobId)
}
9 changes: 8 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,14 @@ message ExportRequest {

FingerprintOptions fingerprint_options = 15 [(gogoproto.nullable) = false];

// Next ID: 16
// IncludeMVCCValueHeader controls whether the MVCCValueHeader is
// included in exported bytes. Callers should only set this when all
// readers of the returned SST are prepared to parse full a
// MVCCValue. Even when set, only fields appropriate for export are
// included. See storage.EncodeMVCCValueForExport for details.
bool include_mvcc_value_header = 16 [(gogoproto.customname) = "IncludeMVCCValueHeader"];

// Next ID: 17
}

message FingerprintOptions {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func evalExport(
TargetLockConflictBytes: targetLockConflictBytes,
StopMidKey: args.SplitMidKey,
ScanStats: cArgs.ScanStats,
IncludeMVCCValueHeader: args.IncludeMVCCValueHeader,
}
var summary kvpb.BulkOpSummary
var resumeInfo storage.ExportRequestResumeInfo
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/kvserverbase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ type BulkAdderOptions struct {
// the first buffer to pick split points in the hope it is a representative
// sample of the overall input.
InitialSplitsIfUnordered int

// ImportEpoch specifies the ImportEpoch of the table the BulkAdder
// is ingesting data into as part of an IMPORT INTO job. If specified, the Bulk
// Adder's SSTBatcher will write the import epoch to each versioned value's
// metadata.
//
// Callers should check that the cluster is at or above
// version 24.1 before setting this option.
ImportEpoch uint32
}

// BulkAdderFactory describes a factory function for BulkAdders.
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,14 @@ message BackupDataSpec {
optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"];

optional ElidePrefix elide_prefix = 12 [(gogoproto.nullable) = false];
// NEXTID: 13.

// IncludeMVCCValueHeader indicates whether the backup should be
// created with MVCCValueHeaders in the exported data. This should
// only be set on backups starting on cluster version 24.1 or
// greater.
optional bool include_mvcc_value_header = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IncludeMVCCValueHeader"];

// NEXTID: 14.
}

message RestoreFileSpec {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ go_test(
"exportparquet_test.go",
"import_csv_mark_redaction_test.go",
"import_into_test.go",
"import_mvcc_test.go",
"import_processor_test.go",
"import_stmt_test.go",
"main_test.go",
Expand Down Expand Up @@ -187,6 +188,7 @@ go_test(
"//pkg/keys",
"//pkg/keyvisualizer",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
Expand Down Expand Up @@ -229,6 +231,7 @@ go_test(
"//pkg/sql/stats",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/jobutils",
Expand Down
Loading

0 comments on commit 2d4b601

Please sign in to comment.