diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 70bad3a6efd6..41283142a98f 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -169,6 +169,7 @@ go_test( "bench_test.go", "create_scheduled_backup_test.go", "datadriven_test.go", + "file_sst_sink_test.go", "full_cluster_backup_restore_test.go", "incrementals_test.go", "insert_missing_public_schema_namespace_entry_restore_test.go", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index bd31037f4375..d913c889a4b4 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -908,7 +908,7 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { // If this span extended the last span added -- that is, picked up where it // ended and has the same time-bounds -- then we can simply extend that span // and add to its entry counts. Otherwise we need to record it separately. - if l := len(s.flushedFiles) - 1; l > 0 && s.flushedFiles[l].Span.EndKey.Equal(span.Key) && + if l := len(s.flushedFiles) - 1; l >= 0 && s.flushedFiles[l].Span.EndKey.Equal(span.Key) && s.flushedFiles[l].EndTime.EqOrdering(resp.f.EndTime) && s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) { s.flushedFiles[l].Span.EndKey = span.EndKey diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 7a18317f1c19..84fe6a4acc1b 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -3915,7 +3915,7 @@ func TestBackupRestoreChecksum(t *testing.T) { } // Corrupt one of the files in the backup. - f, err := os.OpenFile(filepath.Join(dir, backupManifest.Files[1].Path), os.O_WRONLY, 0) + f, err := os.OpenFile(filepath.Join(dir, backupManifest.Files[0].Path), os.O_WRONLY, 0) if err != nil { t.Fatalf("%+v", err) } diff --git a/pkg/ccl/backupccl/file_sst_sink_test.go b/pkg/ccl/backupccl/file_sst_sink_test.go new file mode 100644 index 000000000000..ebfc8faea6f8 --- /dev/null +++ b/pkg/ccl/backupccl/file_sst_sink_test.go @@ -0,0 +1,137 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/blobs" + "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/require" +) + +// TestFileSSTSinkExtendOneFile is a regression test for a bug in fileSSTSink in +// which the sink fails to extend its last span added if there's only one file +// in the sink so far. +func TestFileSSTSinkExtendOneFile(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc, sqlDB, _, cleanup := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication) + defer cleanup() + + store, err := cloud.ExternalStorageFromURI(ctx, "userfile:///0", + base.ExternalIODirConfig{}, + tc.Servers[0].ClusterSettings(), + blobs.TestEmptyBlobClientFactory, + security.RootUserName(), + tc.Servers[0].InternalExecutor().(*sql.InternalExecutor), + tc.Servers[0].DB(), + nil, /* limiters */ + ) + require.NoError(t, err) + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.file_size = '20B'`) + + // Never block. + progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, 10) + + sinkConf := sstSinkConf{ + id: 1, + enc: nil, + progCh: progCh, + settings: &tc.Servers[0].ClusterSettings().SV, + } + + sink, err := makeSSTSink(ctx, sinkConf, store, nil) + require.NoError(t, err) + + getKeys := func(prefix string, n int) []byte { + var b bytes.Buffer + sst := storage.MakeBackupSSTWriter(ctx, nil, &b) + for i := 0; i < n; i++ { + require.NoError(t, sst.PutUnversioned([]byte(fmt.Sprintf("%s%08d", prefix, i)), nil)) + } + sst.Close() + return b.Bytes() + } + + exportResponse1 := returnedSST{ + f: BackupManifest_File{ + Span: roachpb.Span{ + Key: []byte("b"), + EndKey: []byte("b"), + }, + EntryCounts: roachpb.RowCount{ + DataSize: 100, + Rows: 1, + }, + StartTime: hlc.Timestamp{}, + EndTime: hlc.Timestamp{}, + LocalityKV: "", + }, + sst: getKeys("b", 100), + revStart: hlc.Timestamp{}, + completedSpans: 1, + atKeyBoundary: false, + } + + exportResponse2 := returnedSST{ + f: BackupManifest_File{ + Span: roachpb.Span{ + Key: []byte("b"), + EndKey: []byte("z"), + }, + EntryCounts: roachpb.RowCount{ + DataSize: 100, + Rows: 1, + }, + StartTime: hlc.Timestamp{}, + EndTime: hlc.Timestamp{}, + LocalityKV: "", + }, + sst: getKeys("c", 100), + revStart: hlc.Timestamp{}, + completedSpans: 1, + atKeyBoundary: true, + } + + require.NoError(t, sink.write(ctx, exportResponse1)) + require.NoError(t, sink.write(ctx, exportResponse2)) + + close(progCh) + + var progs []execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + for p := range progCh { + progs = append(progs, p) + } + + require.Equal(t, 1, len(progs)) + var progDetails BackupManifest_Progress + if err := types.UnmarshalAny(&progs[0].ProgressDetails, &progDetails); err != nil { + t.Fatal(err) + } + + // Verify that the file in the sink was properly extended and there is only 1 + // file in the progress details. + require.Equal(t, 1, len(progDetails.Files)) +} diff --git a/pkg/ccl/backupccl/show_test.go b/pkg/ccl/backupccl/show_test.go index 76e128f8f44b..82636368cdc4 100644 --- a/pkg/ccl/backupccl/show_test.go +++ b/pkg/ccl/backupccl/show_test.go @@ -52,6 +52,7 @@ func TestShowBackup(t *testing.T) { defer cleanupEmptyCluster() sqlDB.Exec(t, ` SET CLUSTER SETTING sql.cross_db_fks.enabled = TRUE; +SET CLUSTER SETTING bulkio.backup.file_size = '1'; CREATE TYPE data.welcome AS ENUM ('hello', 'hi'); USE data; CREATE SCHEMA sc; CREATE TABLE data.sc.t1 (a INT);