From cb6934eb54ba64ccb65b427213f92746b2c77aaa Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 12 May 2021 21:56:54 -0700 Subject: [PATCH 1/3] kv/kvserver: skip TestRejectedLeaseDoesntDictateClosedTimestamp Refs: #65109 Reason: flaky test Generated by bin/skip-test. Release justification: non-production code changes Release note: None --- pkg/kv/kvserver/replica_closedts_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/kv/kvserver/replica_closedts_test.go b/pkg/kv/kvserver/replica_closedts_test.go index f9b7345cbc23..600386e6abf1 100644 --- a/pkg/kv/kvserver/replica_closedts_test.go +++ b/pkg/kv/kvserver/replica_closedts_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -449,6 +450,7 @@ func TestBumpSideTransportClosed(t *testing.T) { // 6, the lease proposal doesn't bump the assignedClosedTimestamp. func TestRejectedLeaseDoesntDictateClosedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() + skip.WithIssue(t, 65109, "flaky test") defer log.Scope(t).Close(t) ctx := context.Background() From 8b944e6105121ab1035c8e74fea17b30650ecce2 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 12 May 2021 14:14:58 +0000 Subject: [PATCH 2/3] storage/cloud: switch callers to cloud.WriteFile helper This changes all callers of ExternalStorage.WriteFile() to cloud.WriteFile(ExternalStorage). This shim just calls the old method under the hood, but will allow swapping ExternalStorage's API to a Writer that this shim can then use. Release note: none. --- pkg/ccl/backupccl/backup_destination_test.go | 4 ++-- pkg/ccl/backupccl/backup_job.go | 2 +- pkg/ccl/backupccl/backup_processor.go | 2 +- pkg/ccl/backupccl/manifest_handling.go | 10 +++++----- pkg/ccl/changefeedccl/sink_cloudstorage.go | 4 ++-- pkg/ccl/cliccl/debug_backup.go | 2 +- pkg/ccl/importccl/exportcsv.go | 2 +- pkg/ccl/importccl/import_stmt.go | 2 +- pkg/ccl/importccl/import_stmt_test.go | 10 +++++----- pkg/ccl/importccl/read_import_base.go | 2 +- pkg/ccl/storageccl/export.go | 2 +- pkg/sql/copy_file_upload.go | 3 ++- pkg/storage/cloud/cloud_io.go | 6 ++++++ .../cloud/cloudtestutils/cloud_test_helpers.go | 14 +++++++------- pkg/storage/cloud/httpsink/http_storage_test.go | 4 ++-- .../cloud/nullsink/nullsink_storage_test.go | 2 +- .../cloud/userfile/file_table_storage_test.go | 6 +++--- 17 files changed, 42 insertions(+), 35 deletions(-) diff --git a/pkg/ccl/backupccl/backup_destination_test.go b/pkg/ccl/backupccl/backup_destination_test.go index 378691e558aa..e06390417637 100644 --- a/pkg/ccl/backupccl/backup_destination_test.go +++ b/pkg/ccl/backupccl/backup_destination_test.go @@ -63,7 +63,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { storage, err := externalStorageFromURI(ctx, uri, security.RootUserName()) defer storage.Close() require.NoError(t, err) - require.NoError(t, storage.WriteFile(ctx, backupManifestName, emptyReader)) + require.NoError(t, cloud.WriteFile(ctx, storage, backupManifestName, emptyReader)) } // writeLatest writes latestBackupSuffix to the LATEST file in the given @@ -72,7 +72,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { storage, err := externalStorageFromURI(ctx, collectionURI, security.RootUserName()) defer storage.Close() require.NoError(t, err) - require.NoError(t, storage.WriteFile(ctx, latestFileName, bytes.NewReader([]byte(latestBackupSuffix)))) + require.NoError(t, cloud.WriteFile(ctx, storage, latestFileName, bytes.NewReader([]byte(latestBackupSuffix)))) } // localizeURI returns a slice of just the base URI if localities is nil. diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 641dc1b93078..9a17f8c5a3c8 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -524,7 +524,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } defer c.Close() - if err := c.WriteFile(ctx, latestFileName, strings.NewReader(suffix)); err != nil { + if err := cloud.WriteFile(ctx, c, latestFileName, strings.NewReader(suffix)); err != nil { return err } } diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 200d203f40ea..71498573cbdd 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -456,7 +456,7 @@ func writeFile( exportStore = localitySpecificStore } - if err := exportStore.WriteFile(ctx, file.Path, bytes.NewReader(data)); err != nil { + if err := cloud.WriteFile(ctx, exportStore, file.Path, bytes.NewReader(data)); err != nil { log.VEventf(ctx, 1, "failed to put file: %+v", err) return errors.Wrap(err, "writing SST") } diff --git a/pkg/ccl/backupccl/manifest_handling.go b/pkg/ccl/backupccl/manifest_handling.go index cb6ea4d27e74..a1189fc1bcca 100644 --- a/pkg/ccl/backupccl/manifest_handling.go +++ b/pkg/ccl/backupccl/manifest_handling.go @@ -391,7 +391,7 @@ func writeBackupManifest( } } - if err := exportStore.WriteFile(ctx, filename, bytes.NewReader(descBuf)); err != nil { + if err := cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(descBuf)); err != nil { return err } @@ -400,7 +400,7 @@ func writeBackupManifest( if err != nil { return errors.Wrap(err, "calculating checksum") } - if err := exportStore.WriteFile(ctx, filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil { + if err := cloud.WriteFile(ctx, exportStore, filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil { return errors.Wrap(err, "writing manifest checksum") } @@ -487,7 +487,7 @@ func writeBackupPartitionDescriptor( } } - return exportStore.WriteFile(ctx, filename, bytes.NewReader(descBuf)) + return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(descBuf)) } // writeTableStatistics writes a StatsTable object to a file of the filename @@ -515,7 +515,7 @@ func writeTableStatistics( return err } } - return exportStore.WriteFile(ctx, filename, bytes.NewReader(statsBuf)) + return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(statsBuf)) } func loadBackupManifests( @@ -951,7 +951,7 @@ func writeEncryptionInfoIfNotExists( if err != nil { return err } - if err := dest.WriteFile(ctx, backupEncryptionInfoFile, bytes.NewReader(buf)); err != nil { + if err := cloud.WriteFile(ctx, dest, backupEncryptionInfoFile, bytes.NewReader(buf)); err != nil { return err } return nil diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 30b38186341f..093936fa7375 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -452,7 +452,7 @@ func (s *cloudStorageSink) EmitResolvedTimestamp( if log.V(1) { log.Infof(ctx, "writing file %s %s", filename, resolved.AsOfSystemTime()) } - return s.es.WriteFile(ctx, filepath.Join(part, filename), bytes.NewReader(payload)) + return cloud.WriteFile(ctx, s.es, filepath.Join(part, filename), bytes.NewReader(payload)) } // flushTopicVersions flushes all open files for the provided topic up to and @@ -553,7 +553,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink "precedes a file emitted before: %s", filename, s.prevFilename) } s.prevFilename = filename - if err := s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil { + if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil { return err } return nil diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 2a83f0b1f384..9edaf3d74f72 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -517,7 +517,7 @@ func showData( if err != nil { return errors.Wrapf(err, "unable to open store to write files: %s", debugBackupArgs.destination) } - if err = store.WriteFile(ctx, file, bytes.NewReader(buf.Bytes())); err != nil { + if err = cloud.WriteFile(ctx, store, file, bytes.NewReader(buf.Bytes())); err != nil { _ = store.Close() return err } diff --git a/pkg/ccl/importccl/exportcsv.go b/pkg/ccl/importccl/exportcsv.go index fa02e840aaf8..d77f12dcbc1f 100644 --- a/pkg/ccl/importccl/exportcsv.go +++ b/pkg/ccl/importccl/exportcsv.go @@ -268,7 +268,7 @@ func (sp *csvWriter) Run(ctx context.Context) { size := writer.Len() - if err := es.WriteFile(ctx, filename, bytes.NewReader(writer.Bytes())); err != nil { + if err := cloud.WriteFile(ctx, es, filename, bytes.NewReader(writer.Bytes())); err != nil { return err } res := rowenc.EncDatumRow{ diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 2f54d671902d..bb97ca8bc6aa 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -1693,7 +1693,7 @@ func (u *unsupportedStmtLogger) flush() error { } else { logFileName = path.Join(logFileName, pgDumpUnsupportedSchemaStmtLog, fmt.Sprintf("%d.log", u.flushCount)) } - err = s.WriteFile(u.ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes())) + err = cloud.WriteFile(u.ctx, s, logFileName, bytes.NewReader(u.logBuffer.Bytes())) if err != nil { return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP") } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index b870ce0484aa..6aa2cdae4eb3 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -2286,7 +2286,7 @@ b STRING) CSV DATA (%s)`, testFiles.files[0])); err != nil { require.NoError(t, err) data := []byte("1,2") - require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data))) + require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data))) sqlDB.Exec(t, fmt.Sprintf("IMPORT TABLE foo (id INT PRIMARY KEY, "+ "id2 INT) CSV DATA ('%s')", userfileURI)) @@ -2302,7 +2302,7 @@ b STRING) CSV DATA (%s)`, testFiles.files[0])); err != nil { require.NoError(t, err) data := []byte("1,2") - require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data))) + require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data))) sqlDB.Exec(t, fmt.Sprintf("IMPORT TABLE baz (id INT PRIMARY KEY, "+ "id2 INT) CSV DATA ('%s')", userfileURI)) @@ -2412,7 +2412,7 @@ func TestImportObjectLevelRBAC(t *testing.T) { fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, cluster.NoSettings, blobs.TestEmptyBlobClientFactory, security.TestUserName(), ie, tc.Server(0).DB()) require.NoError(t, err) - require.NoError(t, fileTableSystem1.WriteFile(ctx, filename, bytes.NewReader([]byte("1,aaa")))) + require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte("1,aaa")))) } t.Run("import-RBAC", func(t *testing.T) { @@ -3440,7 +3440,7 @@ func TestImportIntoCSV(t *testing.T) { require.NoError(t, err) data := []byte("1,2") - require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data))) + require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data))) sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, id2 INT)") sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO foo (id, id2) CSV DATA ('%s')", userfileURI)) @@ -3507,7 +3507,7 @@ func benchUserUpload(b *testing.B, uploadBaseURI string) { require.NoError(b, err) content, err := ioutil.ReadAll(r) require.NoError(b, err) - err = userfileStorage.WriteFile(ctx, "", bytes.NewReader(content)) + err = cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(content)) require.NoError(b, err) numBytes = int64(len(content)) } else { diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 15700c58cc78..c448baf25435 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -249,7 +249,7 @@ func readInputFiles( return err } defer rejectedStorage.Close() - if err := rejectedStorage.WriteFile(ctx, "", bytes.NewReader(buf)); err != nil { + if err := cloud.WriteFile(ctx, rejectedStorage, "", bytes.NewReader(buf)); err != nil { return err } return nil diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index ab6aba42bab8..5fb658982fe1 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -215,7 +215,7 @@ func evalExport( if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxUploadRetries, func() error { // We blindly retry any error here because we expect the caller to have // verified the target is writable before sending ExportRequests for it. - if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil { + if err := cloud.WriteFile(ctx, exportStore, exported.Path, bytes.NewReader(data)); err != nil { log.VEventf(ctx, 1, "failed to put file: %+v", err) return err } diff --git a/pkg/sql/copy_file_upload.go b/pkg/sql/copy_file_upload.go index 13c473424c25..f363d5018ee1 100644 --- a/pkg/sql/copy_file_upload.go +++ b/pkg/sql/copy_file_upload.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/errors" "github.com/lib/pq" ) @@ -136,7 +137,7 @@ func newFileUploadMachine( f.wg.Add(1) go func() { - err := store.WriteFile(ctx, "", &noopReadSeeker{pr}) + err := cloud.WriteFile(ctx, store, "", &noopReadSeeker{pr}) if err != nil { _ = pr.CloseWithError(err) } diff --git a/pkg/storage/cloud/cloud_io.go b/pkg/storage/cloud/cloud_io.go index 68df4484f61e..6c267ebc85a0 100644 --- a/pkg/storage/cloud/cloud_io.go +++ b/pkg/storage/cloud/cloud_io.go @@ -251,3 +251,9 @@ func CheckHTTPContentRangeHeader(h string, pos int64) (int64, error) { return size, nil } + +// WriteFile is a helper for writing the content of a Reader to the given path +// of an ExternalStorage. +func WriteFile(ctx context.Context, dest ExternalStorage, basename string, src io.ReadSeeker) error { + return dest.WriteFile(ctx, basename, src) +} diff --git a/pkg/storage/cloud/cloudtestutils/cloud_test_helpers.go b/pkg/storage/cloud/cloudtestutils/cloud_test_helpers.go index a8aae18ae9ad..543d56735201 100644 --- a/pkg/storage/cloud/cloudtestutils/cloud_test_helpers.go +++ b/pkg/storage/cloud/cloudtestutils/cloud_test_helpers.go @@ -163,7 +163,7 @@ func CheckExportStore( for i := 0; i < 10; i++ { name := fmt.Sprintf("%s-%d", sampleName, i) payload := []byte(strings.Repeat(sampleBytes, i)) - if err := s.WriteFile(ctx, name, bytes.NewReader(payload)); err != nil { + if err := cloud.WriteFile(ctx, s, name, bytes.NewReader(payload)); err != nil { t.Fatal(err) } @@ -199,7 +199,7 @@ func CheckExportStore( testingFilename := "testing-123" // Write some random data (random so it doesn't compress). - if err := s.WriteFile(ctx, testingFilename, bytes.NewReader(testingContent)); err != nil { + if err := cloud.WriteFile(ctx, s, testingFilename, bytes.NewReader(testingContent)); err != nil { t.Fatal(err) } @@ -248,7 +248,7 @@ func CheckExportStore( } t.Run("read-single-file-by-uri", func(t *testing.T) { const testingFilename = "A" - if err := s.WriteFile(ctx, testingFilename, bytes.NewReader([]byte("aaa"))); err != nil { + if err := cloud.WriteFile(ctx, s, testingFilename, bytes.NewReader([]byte("aaa"))); err != nil { t.Fatal(err) } singleFile := storeFromURI(ctx, t, appendPath(t, storeURI, testingFilename), clientFactory, @@ -276,7 +276,7 @@ func CheckExportStore( user, ie, kvDB, testSettings) defer singleFile.Close() - if err := singleFile.WriteFile(ctx, "", bytes.NewReader([]byte("bbb"))); err != nil { + if err := cloud.WriteFile(ctx, singleFile, "", bytes.NewReader([]byte("bbb"))); err != nil { t.Fatal(err) } @@ -301,7 +301,7 @@ func CheckExportStore( // (based on the storage system) could not be found. t.Run("file-does-not-exist", func(t *testing.T) { const testingFilename = "A" - if err := s.WriteFile(ctx, testingFilename, bytes.NewReader([]byte("aaa"))); err != nil { + if err := cloud.WriteFile(ctx, s, testingFilename, bytes.NewReader([]byte("aaa"))); err != nil { t.Fatal(err) } singleFile := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, kvDB, testSettings) @@ -375,7 +375,7 @@ func CheckListFilesCanonical( clientFactory := blobs.TestBlobServiceClient(testSettings.ExternalIODir) for _, fileName := range fileNames { file := storeFromURI(ctx, t, storeURI, clientFactory, user, ie, kvDB, testSettings) - if err := file.WriteFile(ctx, fileName, bytes.NewReader([]byte("bbb"))); err != nil { + if err := cloud.WriteFile(ctx, file, fileName, bytes.NewReader([]byte("bbb"))); err != nil { t.Fatal(err) } _ = file.Close() @@ -537,7 +537,7 @@ func uploadData( nil, nil, nil) require.NoError(t, err) defer s.Close() - require.NoError(t, s.WriteFile(ctx, basename, bytes.NewReader(data))) + require.NoError(t, cloud.WriteFile(ctx, s, basename, bytes.NewReader(data))) return data, func() { _ = s.Delete(ctx, basename) } diff --git a/pkg/storage/cloud/httpsink/http_storage_test.go b/pkg/storage/cloud/httpsink/http_storage_test.go index 5cf1a210eb24..912f7c06fce4 100644 --- a/pkg/storage/cloud/httpsink/http_storage_test.go +++ b/pkg/storage/cloud/httpsink/http_storage_test.go @@ -166,10 +166,10 @@ func TestPutHttp(t *testing.T) { const file = "file" var content = []byte("contents") - if err := s.WriteFile(ctx, file, bytes.NewReader(content)); err != nil { + if err := cloud.WriteFile(ctx, s, file, bytes.NewReader(content)); err != nil { t.Fatal(err) } - if err := s.WriteFile(ctx, badHeadResponse, bytes.NewReader(content)); err != nil { + if err := cloud.WriteFile(ctx, s, badHeadResponse, bytes.NewReader(content)); err != nil { t.Fatal(err) } if sz, err := s.Size(ctx, file); err != nil { diff --git a/pkg/storage/cloud/nullsink/nullsink_storage_test.go b/pkg/storage/cloud/nullsink/nullsink_storage_test.go index 520a4cf14b16..6f62acbff01e 100644 --- a/pkg/storage/cloud/nullsink/nullsink_storage_test.go +++ b/pkg/storage/cloud/nullsink/nullsink_storage_test.go @@ -43,7 +43,7 @@ func TestNullSinkReadAndWrite(t *testing.T) { defer s.Close() require.Equal(t, roachpb.ExternalStorage{Provider: roachpb.ExternalStorageProvider_null}, s.Conf()) - require.NoError(t, s.WriteFile(ctx, "", bytes.NewReader([]byte("abc")))) + require.NoError(t, cloud.WriteFile(ctx, s, "", bytes.NewReader([]byte("abc")))) sz, err := s.Size(ctx, "") require.NoError(t, err) require.Equal(t, int64(0), sz) diff --git a/pkg/storage/cloud/userfile/file_table_storage_test.go b/pkg/storage/cloud/userfile/file_table_storage_test.go index d7aa0a3d3d5f..cb95a7634392 100644 --- a/pkg/storage/cloud/userfile/file_table_storage_test.go +++ b/pkg/storage/cloud/userfile/file_table_storage_test.go @@ -73,7 +73,7 @@ func TestPutUserFileTable(t *testing.T) { require.NoError(t, err) defer store.Close() - err = store.WriteFile(ctx, testfile, bytes.NewReader([]byte{0})) + err = cloud.WriteFile(ctx, store, testfile, bytes.NewReader([]byte{0})) require.True(t, testutils.IsError(err, "does not permit such constructs")) }) } @@ -118,7 +118,7 @@ func TestUserScoping(t *testing.T) { fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, cluster.NoSettings, blobs.TestEmptyBlobClientFactory, user1, ie, kvDB) require.NoError(t, err) - require.NoError(t, fileTableSystem1.WriteFile(ctx, filename, bytes.NewReader([]byte("aaa")))) + require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte("aaa")))) // Attempt to read/write file as user2 and expect to fail. fileTableSystem2, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, @@ -126,7 +126,7 @@ func TestUserScoping(t *testing.T) { require.NoError(t, err) _, err = fileTableSystem2.ReadFile(ctx, filename) require.Error(t, err) - require.Error(t, fileTableSystem2.WriteFile(ctx, filename, bytes.NewReader([]byte("aaa")))) + require.Error(t, cloud.WriteFile(ctx, fileTableSystem2, filename, bytes.NewReader([]byte("aaa")))) // Read file as root and expect to succeed. fileTableSystem3, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{}, From ac529817e365802c9c39f6305f851918975ee4d7 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 12 May 2021 14:19:51 +0000 Subject: [PATCH 3/3] storage/cloud: replace WriteFile(Reader) with Writer This changes the ExternalStorage API's writing method from WriteFile which takes an io.Reader and writes its content to the requested destination or returns an error encountered while doing so to instead have Writer() which returns an io.Writer pointing to the destination that can be written to later and then closed to finish the upload (or CloseWithError'ed to cancel it). All existing callers use the shim and are unaffected, but can later choose to change to a push-based model and use the Writer directly. This is left to a follow-up change. Release note: none. --- pkg/blobs/client.go | 8 +-- pkg/ccl/importccl/testutils_test.go | 8 +-- pkg/sql/copy_file_upload.go | 40 +++++------- pkg/storage/cloud/BUILD.bazel | 1 + pkg/storage/cloud/amazon/s3_storage.go | 65 ++++++++++--------- pkg/storage/cloud/azure/azure_storage.go | 25 ++++--- pkg/storage/cloud/cloud_io.go | 56 +++++++++++++++- pkg/storage/cloud/external_storage.go | 14 +++- pkg/storage/cloud/gcp/BUILD.bazel | 1 - pkg/storage/cloud/gcp/gcs_storage.go | 24 ++----- pkg/storage/cloud/httpsink/http_storage.go | 13 ++-- .../cloud/nodelocal/nodelocal_storage.go | 11 ++-- .../cloud/nullsink/nullsink_storage.go | 14 ++-- .../cloud/userfile/file_table_storage.go | 44 ++----------- .../filetable/file_table_read_writer.go | 6 +- 15 files changed, 179 insertions(+), 151 deletions(-) diff --git a/pkg/blobs/client.go b/pkg/blobs/client.go index d5e4cd43bbd0..bfd75f8512e6 100644 --- a/pkg/blobs/client.go +++ b/pkg/blobs/client.go @@ -34,7 +34,7 @@ type BlobClient interface { // WriteFile sends the named payload to the requested node. // This method will read entire content of file and send // it over to another node, based on the nodeID. - WriteFile(ctx context.Context, file string, content io.ReadSeeker) error + WriteFile(ctx context.Context, file string, content io.Reader) error // List lists the corresponding filenames from the requested node. // The requested node can be the current node. @@ -75,9 +75,7 @@ func (c *remoteClient) ReadFile( return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file") } -func (c *remoteClient) WriteFile( - ctx context.Context, file string, content io.ReadSeeker, -) (err error) { +func (c *remoteClient) WriteFile(ctx context.Context, file string, content io.Reader) (err error) { ctx = metadata.AppendToOutgoingContext(ctx, "filename", file) stream, err := c.blobClient.PutStream(ctx) if err != nil { @@ -143,7 +141,7 @@ func (c *localClient) ReadFile( return c.localStorage.ReadFile(file, offset) } -func (c *localClient) WriteFile(ctx context.Context, file string, content io.ReadSeeker) error { +func (c *localClient) WriteFile(ctx context.Context, file string, content io.Reader) error { return c.localStorage.WriteFile(file, content) } diff --git a/pkg/ccl/importccl/testutils_test.go b/pkg/ccl/importccl/testutils_test.go index 6fd116aa86b7..77009a108fd3 100644 --- a/pkg/ccl/importccl/testutils_test.go +++ b/pkg/ccl/importccl/testutils_test.go @@ -271,10 +271,10 @@ func (es *generatorExternalStorage) Size(ctx context.Context, basename string) ( return int64(es.gen.size), nil } -func (es *generatorExternalStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - return errors.New("unsupported") +func (es *generatorExternalStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + return nil, errors.New("unsupported") } func (es *generatorExternalStorage) ListFiles(ctx context.Context, _ string) ([]string, error) { diff --git a/pkg/sql/copy_file_upload.go b/pkg/sql/copy_file_upload.go index f363d5018ee1..9f960aa9c631 100644 --- a/pkg/sql/copy_file_upload.go +++ b/pkg/sql/copy_file_upload.go @@ -16,7 +16,6 @@ import ( "io" "net/url" "strings" - "sync" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/lex" @@ -42,8 +41,7 @@ var _ copyMachineInterface = &fileUploadMachine{} type fileUploadMachine struct { c *copyMachine - writeToFile *io.PipeWriter - wg *sync.WaitGroup + w cloud.WriteCloserWithError failureCleanup func() } @@ -94,8 +92,7 @@ func newFileUploadMachine( p: planner{execCfg: execCfg, alloc: &rowenc.DatumAlloc{}}, } f = &fileUploadMachine{ - c: c, - wg: &sync.WaitGroup{}, + c: c, } // We need a planner to do the initial planning, even if a planner @@ -124,7 +121,6 @@ func newFileUploadMachine( return nil, err } - pr, pw := io.Pipe() store, err := c.p.execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, dest, c.p.User()) if err != nil { return nil, err @@ -135,15 +131,11 @@ func newFileUploadMachine( return nil, err } - f.wg.Add(1) - go func() { - err := cloud.WriteFile(ctx, store, "", &noopReadSeeker{pr}) - if err != nil { - _ = pr.CloseWithError(err) - } - f.wg.Done() - }() - f.writeToFile = pw + f.w, err = store.Writer(ctx, "") + if err != nil { + return nil, err + } + f.failureCleanup = func() { // Ignoring this error because deletion would only fail // if the file was not created in the first place. @@ -174,20 +166,24 @@ func CopyInFileStmt(destination, schema, table string) string { ) } -func (f *fileUploadMachine) run(ctx context.Context) (err error) { - err = f.c.run(ctx) - _ = f.writeToFile.Close() +func (f *fileUploadMachine) run(ctx context.Context) error { + err := f.c.run(ctx) + if err != nil { + err = errors.CombineErrors(err, f.w.CloseWithError(err)) + } else { + err = f.w.Close() + } + if err != nil { f.failureCleanup() } - f.wg.Wait() - return + return err } func (f *fileUploadMachine) writeFile(ctx context.Context) error { for _, r := range f.c.rows { b := []byte(*r[0].(*tree.DBytes)) - n, err := f.writeToFile.Write(b) + n, err := f.w.Write(b) if err != nil { return err } @@ -197,7 +193,7 @@ func (f *fileUploadMachine) writeFile(ctx context.Context) error { } // Issue a final zero-byte write to ensure we observe any errors in the pipe. - _, err := f.writeToFile.Write(nil) + _, err := f.w.Write(nil) if err != nil { return err } diff --git a/pkg/storage/cloud/BUILD.bazel b/pkg/storage/cloud/BUILD.bazel index 5ec17d7b8fdc..fcce5c893252 100644 --- a/pkg/storage/cloud/BUILD.bazel +++ b/pkg/storage/cloud/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/sqlutil", + "//pkg/util/ctxgroup", "//pkg/util/log", "//pkg/util/retry", "//pkg/util/sysutil", diff --git a/pkg/storage/cloud/amazon/s3_storage.go b/pkg/storage/cloud/amazon/s3_storage.go index a0bcdfdd0877..aff60ab8cfcf 100644 --- a/pkg/storage/cloud/amazon/s3_storage.go +++ b/pkg/storage/cloud/amazon/s3_storage.go @@ -244,7 +244,7 @@ func s3ErrDelay(err error) time.Duration { return 0 } -func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) { +func (s *s3Storage) newSession(ctx context.Context) (*session.Session, error) { sess, err := session.NewSessionWithOptions(s.opts) if err != nil { return nil, errors.Wrap(err, "new aws session") @@ -259,6 +259,14 @@ func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) { } } sess.Config.Region = aws.String(s.conf.Region) + return sess, err +} + +func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) { + sess, err := s.newSession(ctx) + if err != nil { + return nil, err + } return s3.New(sess), nil } @@ -277,39 +285,27 @@ func (s *s3Storage) Settings() *cluster.Settings { return s.settings } -func (s *s3Storage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - client, err := s.newS3Client(ctx) +func (s *s3Storage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + sess, err := s.newSession(ctx) if err != nil { - return err + return nil, err } - err = contextutil.RunWithTimeout(ctx, "put s3 object", - cloud.Timeout.Get(&s.settings.SV), - func(ctx context.Context) error { - putObjectInput := s3.PutObjectInput{ - Bucket: s.bucket, - Key: aws.String(path.Join(s.prefix, basename)), - Body: content, - } - - // If a server side encryption mode is provided in the URI, we must set - // the header values to enable SSE before writing the file to the s3 - // bucket. - if s.conf.ServerEncMode != "" { - switch s.conf.ServerEncMode { - case string(aes256Enc): - putObjectInput.SetServerSideEncryption(s.conf.ServerEncMode) - case string(kmsEnc): - putObjectInput.SetServerSideEncryption(s.conf.ServerEncMode) - putObjectInput.SetSSEKMSKeyId(s.conf.ServerKMSID) - default: - return errors.Newf("unsupported server encryption mode %s. "+ - "Supported values are `aws:kms` and `AES256`.", s.conf.ServerEncMode) - } - } - _, err := client.PutObjectWithContext(ctx, &putObjectInput) - return err + uploader := s3manager.NewUploader(sess) + + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + // Upload the file to S3. + // TODO(dt): test and tune the uploader parameters. + _, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{ + Bucket: s.bucket, + Key: aws.String(path.Join(s.prefix, basename)), + Body: r, + ServerSideEncryption: nilIfEmpty(s.conf.ServerEncMode), + SSEKMSKeyId: nilIfEmpty(s.conf.ServerKMSID), }) - return errors.Wrap(err, "failed to put s3 object") + return errors.Wrap(err, "upload failed") + }), nil } func (s *s3Storage) openStreamAt( @@ -481,6 +477,13 @@ func (s *s3Storage) Close() error { return nil } +func nilIfEmpty(s string) *string { + if s == "" { + return nil + } + return aws.String(s) +} + func init() { cloud.RegisterExternalStorageProvider(roachpb.ExternalStorageProvider_s3, parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3") diff --git a/pkg/storage/cloud/azure/azure_storage.go b/pkg/storage/cloud/azure/azure_storage.go index e7f9629cd090..04f760c0e847 100644 --- a/pkg/storage/cloud/azure/azure_storage.go +++ b/pkg/storage/cloud/azure/azure_storage.go @@ -125,19 +125,18 @@ func (s *azureStorage) Settings() *cluster.Settings { return s.settings } -func (s *azureStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - err := contextutil.RunWithTimeout(ctx, "write azure file", cloud.Timeout.Get(&s.settings.SV), - func(ctx context.Context) error { - blob := s.getBlob(basename) - _, err := blob.Upload( - ctx, content, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, - azblob.DefaultAccessTier, nil /* blobTagsMap */, azblob.ClientProvidedKeyOptions{}, - ) - return err - }) - return errors.Wrapf(err, "write file: %s", basename) +func (s *azureStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + blob := s.getBlob(basename) + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + _, err := azblob.UploadStreamToBlockBlob( + ctx, r, blob, azblob.UploadStreamToBlockBlobOptions{ + BufferSize: 4 << 20, + }, + ) + return err + }), nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/storage/cloud/cloud_io.go b/pkg/storage/cloud/cloud_io.go index 6c267ebc85a0..e909f6156099 100644 --- a/pkg/storage/cloud/cloud_io.go +++ b/pkg/storage/cloud/cloud_io.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/sysutil" @@ -252,8 +253,59 @@ func CheckHTTPContentRangeHeader(h string, pos int64) (int64, error) { return size, nil } +// BackgroundPipe is a helper for providing a Writer that is backed by a pipe +// that has a background process reading from it. It *must* be Closed(). +func BackgroundPipe( + ctx context.Context, fn func(ctx context.Context, pr io.Reader) error, +) WriteCloserWithError { + pr, pw := io.Pipe() + w := &backgroundPipe{w: pw, grp: ctxgroup.WithContext(ctx)} + w.grp.GoCtx(func(ctc context.Context) error { + err := fn(ctx, pr) + if err != nil { + closeErr := pr.CloseWithError(err) + err = errors.CombineErrors(err, closeErr) + } else { + err = pr.Close() + } + return err + }) + return w +} + +type backgroundPipe struct { + w *io.PipeWriter + grp ctxgroup.Group +} + +// Write writes to the writer. +func (s *backgroundPipe) Write(p []byte) (int, error) { + return s.w.Write(p) +} + +// Close closes the writer, finishing the write operation. +func (s *backgroundPipe) Close() error { + err := s.w.Close() + return errors.CombineErrors(err, s.grp.Wait()) +} + +// CloseWithError closes the Writer with an error, which may discard prior +// writes and abort the overall write operation. +func (s *backgroundPipe) CloseWithError(err error) error { + e := s.w.CloseWithError(err) + return errors.CombineErrors(e, s.grp.Wait()) +} + // WriteFile is a helper for writing the content of a Reader to the given path // of an ExternalStorage. -func WriteFile(ctx context.Context, dest ExternalStorage, basename string, src io.ReadSeeker) error { - return dest.WriteFile(ctx, basename, src) +func WriteFile(ctx context.Context, dest ExternalStorage, basename string, src io.Reader) error { + w, err := dest.Writer(ctx, basename) + if err != nil { + return errors.Wrap(err, "opening object for writing") + } + if _, err := io.Copy(w, src); err != nil { + closeErr := w.CloseWithError(err) + return errors.CombineErrors(err, closeErr) + } + return errors.Wrap(w.Close(), "closing object") } diff --git a/pkg/storage/cloud/external_storage.go b/pkg/storage/cloud/external_storage.go index 9c429fdbab03..35788d5b0039 100644 --- a/pkg/storage/cloud/external_storage.go +++ b/pkg/storage/cloud/external_storage.go @@ -29,6 +29,14 @@ import ( // This file is for interfaces only and should not contain any implementation // code. All concrete implementations should be added to pkg/storage/cloudimpl. +// WriteCloserWithError extends WriteCloser with an extra CloseWithError func. +type WriteCloserWithError interface { + io.WriteCloser + // CloseWithError closes the writer with an error, which may choose to abort + // rather than complete any write operations. + CloseWithError(error) error +} + // ExternalStorage provides an API to read and write files in some storage, // namely various cloud storage providers, for example to store backups. // Generally an implementation is instantiated pointing to some base path or @@ -65,8 +73,10 @@ type ExternalStorage interface { // This can be leveraged for an existence check. ReadFileAt(ctx context.Context, basename string, offset int64) (io.ReadCloser, int64, error) - // WriteFile should write the content to requested name. - WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error + // Writer returns a writer for the requested name. The returned writer must + // be closed by the caller to complete the write, or can be closed with an + // error which may, depending on the implementation, abort the write. + Writer(ctx context.Context, basename string) (WriteCloserWithError, error) // ListFiles returns files that match a globs-style pattern. The returned // results are usually relative to the base path, meaning an ExternalStorage diff --git a/pkg/storage/cloud/gcp/BUILD.bazel b/pkg/storage/cloud/gcp/BUILD.bazel index 3bc6b607f885..1eb45f50a612 100644 --- a/pkg/storage/cloud/gcp/BUILD.bazel +++ b/pkg/storage/cloud/gcp/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "//pkg/settings/cluster", "//pkg/storage/cloud", "//pkg/util/contextutil", - "//pkg/util/retry", "@com_github_cockroachdb_errors//:errors", "@com_google_cloud_go_storage//:storage", "@org_golang_google_api//iterator", diff --git a/pkg/storage/cloud/gcp/gcs_storage.go b/pkg/storage/cloud/gcp/gcs_storage.go index 306e6cd65f4c..5375c05ce433 100644 --- a/pkg/storage/cloud/gcp/gcs_storage.go +++ b/pkg/storage/cloud/gcp/gcs_storage.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/util/contextutil" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" "golang.org/x/oauth2/google" "google.golang.org/api/iterator" @@ -157,24 +156,11 @@ func makeGCSStorage( }, nil } -func (g *gcsStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - const maxAttempts = 3 - err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { - if _, err := content.Seek(0, io.SeekStart); err != nil { - return err - } - // Set the timeout within the retry loop. - return contextutil.RunWithTimeout(ctx, "put gcs file", cloud.Timeout.Get(&g.settings.SV), - func(ctx context.Context) error { - w := g.bucket.Object(path.Join(g.prefix, basename)).NewWriter(ctx) - if _, err := io.Copy(w, content); err != nil { - _ = w.Close() - return err - } - return w.Close() - }) - }) - return errors.Wrap(err, "write to google cloud") +func (g *gcsStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + w := g.bucket.Object(path.Join(g.prefix, basename)).NewWriter(ctx) + return w, nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/storage/cloud/httpsink/http_storage.go b/pkg/storage/cloud/httpsink/http_storage.go index ff160f0701d8..01bcddb90825 100644 --- a/pkg/storage/cloud/httpsink/http_storage.go +++ b/pkg/storage/cloud/httpsink/http_storage.go @@ -175,12 +175,13 @@ func (h *httpStorage) ReadFileAt( return stream.Body, size, nil } -func (h *httpStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - return contextutil.RunWithTimeout(ctx, fmt.Sprintf("PUT %s", basename), - cloud.Timeout.Get(&h.settings.SV), func(ctx context.Context) error { - _, err := h.reqNoBody(ctx, "PUT", basename, content) - return err - }) +func (h *httpStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + _, err := h.reqNoBody(ctx, "PUT", basename, r) + return err + }), nil } func (h *httpStorage) ListFiles(_ context.Context, _ string) ([]string, error) { diff --git a/pkg/storage/cloud/nodelocal/nodelocal_storage.go b/pkg/storage/cloud/nodelocal/nodelocal_storage.go index 1a02ae430dfd..27daa2241b0c 100644 --- a/pkg/storage/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/storage/cloud/nodelocal/nodelocal_storage.go @@ -128,10 +128,13 @@ func joinRelativePath(filePath string, file string) string { return path.Join(".", filePath, file) } -func (l *localFileStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - return l.blobClient.WriteFile(ctx, joinRelativePath(l.base, basename), content) +func (l *localFileStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + // TODO(dt): don't need this pipe. + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + return l.blobClient.WriteFile(ctx, joinRelativePath(l.base, basename), r) + }), nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/storage/cloud/nullsink/nullsink_storage.go b/pkg/storage/cloud/nullsink/nullsink_storage.go index 97fbb3a6015a..a8c3a398967f 100644 --- a/pkg/storage/cloud/nullsink/nullsink_storage.go +++ b/pkg/storage/cloud/nullsink/nullsink_storage.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "net/url" "github.com/cockroachdb/cockroach/pkg/base" @@ -36,6 +35,8 @@ func MakeNullSinkStorageURI(path string) string { type nullSinkStorage struct { } +var _ cloud.ExternalStorage = &nullSinkStorage{} + func makeNullSinkStorage( _ context.Context, _ cloud.ExternalStorageContext, _ roachpb.ExternalStorage, ) (cloud.ExternalStorage, error) { @@ -70,9 +71,14 @@ func (n *nullSinkStorage) ReadFileAt( return nil, 0, io.EOF } -func (n *nullSinkStorage) WriteFile(_ context.Context, _ string, content io.ReadSeeker) error { - _, err := io.Copy(ioutil.Discard, content) - return err +type nullWriter struct{} + +func (nullWriter) Write(p []byte) (int, error) { return len(p), nil } +func (nullWriter) Close() error { return nil } +func (nullWriter) CloseWithError(_ error) error { return nil } + +func (n *nullSinkStorage) Writer(_ context.Context, _ string) (cloud.WriteCloserWithError, error) { + return nullWriter{}, nil } func (n *nullSinkStorage) ListFiles(_ context.Context, _ string) ([]string, error) { diff --git a/pkg/storage/cloud/userfile/file_table_storage.go b/pkg/storage/cloud/userfile/file_table_storage.go index 43b8d95e3e44..276afec2ed1a 100644 --- a/pkg/storage/cloud/userfile/file_table_storage.go +++ b/pkg/storage/cloud/userfile/file_table_storage.go @@ -239,53 +239,23 @@ func (f *fileTableStorage) ReadFileAt( return reader, size, err } -// WriteFile implements the ExternalStorage interface and writes the file to the +// Writer implements the ExternalStorage interface and writes the file to the // user scoped FileToTableSystem. -func (f *fileTableStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { +func (f *fileTableStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { filepath, err := checkBaseAndJoinFilePath(f.prefix, basename) if err != nil { - return err + return nil, err } // This is only possible if the method is invoked by a SQLConnFileTableStorage // which should never be the case. if f.ie == nil { - return errors.New("cannot WriteFile without a configured internal executor") - } - - defer func() { - _, _ = f.ie.Exec(ctx, "userfile-write-file-commit", nil /* txn */, `COMMIT`) - }() - - // We open an explicit txn within which we will write the file metadata entry - // and payload chunks to the userfile tables. We cannot perform these - // operations within a db.Txn retry loop because when coming from the - // copyMachine (which backs the userfile CLI upload command), we do not have - // access to all the file data at once. As a result of which, if a txn were to - // retry we are not able to seek to the start of `content` and try again, - // resulting in bytes being missed across txn retry attempts. - // See chunkWriter.WriteFile for more information about writing semantics. - _, err = f.ie.Exec(ctx, "userfile-write-file-txn", nil /* txn */, `BEGIN`) - if err != nil { - return err - } - - writer, err := f.fs.NewFileWriter(ctx, filepath, filetable.ChunkDefaultSize) - if err != nil { - return err - } - - if _, err = io.Copy(writer, content); err != nil { - return errors.Wrap(err, "failed to write using the FileTable writer") - } - - if err := writer.Close(); err != nil { - return errors.Wrap(err, "failed to close the FileTable writer") + return nil, errors.New("cannot Write without a configured internal executor") } - return err + return f.fs.NewFileWriter(ctx, filepath, filetable.ChunkDefaultSize) } // getPrefixAndPattern takes a prefix and optionally suffix of a path pattern diff --git a/pkg/storage/cloudimpl/filetable/file_table_read_writer.go b/pkg/storage/cloudimpl/filetable/file_table_read_writer.go index 73d0955666e0..0f7f07de9968 100644 --- a/pkg/storage/cloudimpl/filetable/file_table_read_writer.go +++ b/pkg/storage/cloudimpl/filetable/file_table_read_writer.go @@ -597,6 +597,10 @@ func (w *chunkWriter) Write(buf []byte) (int, error) { return bufLen, nil } +func (w *chunkWriter) CloseWithError(err error) error { + return w.Close() +} + // Close implements the io.Closer interface by flushing the underlying writer // thereby writing remaining data to the Payload table. It also updates the file // metadata entry in the File table with the number of bytes written. @@ -918,7 +922,7 @@ users WHERE NOT "username" = 'root' AND NOT "username" = 'admin' AND NOT "userna // the last chunk and commit the txn within which all writes occur. func (f *FileToTableSystem) NewFileWriter( ctx context.Context, filename string, chunkSize int, -) (io.WriteCloser, error) { +) (cloud.WriteCloserWithError, error) { e, err := resolveInternalFileToTableExecutor(f.executor) if err != nil { return nil, err