From cd4ab2ac1578e9b1eed517ef50f18f4b2ae2cdb5 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 12 May 2021 14:19:51 +0000 Subject: [PATCH] storage/cloud: replace WriteFile(Reader) with Writer This changes the ExternalStorage API's writing method from WriteFile which takes an io.Reader and writes its content to the requested destination or returns an error encountered while doing so to instead have Writer() which returns an io.Writer pointing to the destination that can be written to later and then closed to finish the upload (or CloseWithError'ed to cancel it). All existing callers use the shim and are unaffected, but can later choose to change to a push-based model and use the Writer directly. This is left to a follow-up change. Release note: none. --- pkg/blobs/client.go | 8 +-- pkg/ccl/importccl/testutils_test.go | 8 +-- pkg/sql/copy_file_upload.go | 40 ++++++------ pkg/storage/cloud/BUILD.bazel | 1 + pkg/storage/cloud/amazon/s3_storage.go | 64 ++++++++++--------- pkg/storage/cloud/azure/azure_storage.go | 25 ++++---- pkg/storage/cloud/cloud_io.go | 53 ++++++++++++++- pkg/storage/cloud/external_storage.go | 14 +++- pkg/storage/cloud/gcp/BUILD.bazel | 1 - pkg/storage/cloud/gcp/gcs_storage.go | 24 ++----- pkg/storage/cloud/httpsink/http_storage.go | 13 ++-- .../cloud/nodelocal/nodelocal_storage.go | 11 ++-- .../cloud/nullsink/nullsink_storage.go | 14 ++-- .../cloud/userfile/file_table_storage.go | 44 ++----------- .../filetable/file_table_read_writer.go | 6 +- 15 files changed, 176 insertions(+), 150 deletions(-) diff --git a/pkg/blobs/client.go b/pkg/blobs/client.go index d5e4cd43bbd0..bfd75f8512e6 100644 --- a/pkg/blobs/client.go +++ b/pkg/blobs/client.go @@ -34,7 +34,7 @@ type BlobClient interface { // WriteFile sends the named payload to the requested node. // This method will read entire content of file and send // it over to another node, based on the nodeID. - WriteFile(ctx context.Context, file string, content io.ReadSeeker) error + WriteFile(ctx context.Context, file string, content io.Reader) error // List lists the corresponding filenames from the requested node. // The requested node can be the current node. @@ -75,9 +75,7 @@ func (c *remoteClient) ReadFile( return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file") } -func (c *remoteClient) WriteFile( - ctx context.Context, file string, content io.ReadSeeker, -) (err error) { +func (c *remoteClient) WriteFile(ctx context.Context, file string, content io.Reader) (err error) { ctx = metadata.AppendToOutgoingContext(ctx, "filename", file) stream, err := c.blobClient.PutStream(ctx) if err != nil { @@ -143,7 +141,7 @@ func (c *localClient) ReadFile( return c.localStorage.ReadFile(file, offset) } -func (c *localClient) WriteFile(ctx context.Context, file string, content io.ReadSeeker) error { +func (c *localClient) WriteFile(ctx context.Context, file string, content io.Reader) error { return c.localStorage.WriteFile(file, content) } diff --git a/pkg/ccl/importccl/testutils_test.go b/pkg/ccl/importccl/testutils_test.go index 6fd116aa86b7..77009a108fd3 100644 --- a/pkg/ccl/importccl/testutils_test.go +++ b/pkg/ccl/importccl/testutils_test.go @@ -271,10 +271,10 @@ func (es *generatorExternalStorage) Size(ctx context.Context, basename string) ( return int64(es.gen.size), nil } -func (es *generatorExternalStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - return errors.New("unsupported") +func (es *generatorExternalStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + return nil, errors.New("unsupported") } func (es *generatorExternalStorage) ListFiles(ctx context.Context, _ string) ([]string, error) { diff --git a/pkg/sql/copy_file_upload.go b/pkg/sql/copy_file_upload.go index 89dc7e1246d4..9f960aa9c631 100644 --- a/pkg/sql/copy_file_upload.go +++ b/pkg/sql/copy_file_upload.go @@ -16,7 +16,6 @@ import ( "io" "net/url" "strings" - "sync" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/lex" @@ -42,8 +41,7 @@ var _ copyMachineInterface = &fileUploadMachine{} type fileUploadMachine struct { c *copyMachine - writeToFile *io.PipeWriter - wg *sync.WaitGroup + w cloud.WriteCloserWithError failureCleanup func() } @@ -94,8 +92,7 @@ func newFileUploadMachine( p: planner{execCfg: execCfg, alloc: &rowenc.DatumAlloc{}}, } f = &fileUploadMachine{ - c: c, - wg: &sync.WaitGroup{}, + c: c, } // We need a planner to do the initial planning, even if a planner @@ -124,7 +121,6 @@ func newFileUploadMachine( return nil, err } - pr, pw := io.Pipe() store, err := c.p.execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, dest, c.p.User()) if err != nil { return nil, err @@ -135,15 +131,11 @@ func newFileUploadMachine( return nil, err } - f.wg.Add(1) - go func() { - err := cloud.WriteFile(ctx, "", &noopReadSeeker{pr}, store) - if err != nil { - _ = pr.CloseWithError(err) - } - f.wg.Done() - }() - f.writeToFile = pw + f.w, err = store.Writer(ctx, "") + if err != nil { + return nil, err + } + f.failureCleanup = func() { // Ignoring this error because deletion would only fail // if the file was not created in the first place. @@ -174,20 +166,24 @@ func CopyInFileStmt(destination, schema, table string) string { ) } -func (f *fileUploadMachine) run(ctx context.Context) (err error) { - err = f.c.run(ctx) - _ = f.writeToFile.Close() +func (f *fileUploadMachine) run(ctx context.Context) error { + err := f.c.run(ctx) + if err != nil { + err = errors.CombineErrors(err, f.w.CloseWithError(err)) + } else { + err = f.w.Close() + } + if err != nil { f.failureCleanup() } - f.wg.Wait() - return + return err } func (f *fileUploadMachine) writeFile(ctx context.Context) error { for _, r := range f.c.rows { b := []byte(*r[0].(*tree.DBytes)) - n, err := f.writeToFile.Write(b) + n, err := f.w.Write(b) if err != nil { return err } @@ -197,7 +193,7 @@ func (f *fileUploadMachine) writeFile(ctx context.Context) error { } // Issue a final zero-byte write to ensure we observe any errors in the pipe. - _, err := f.writeToFile.Write(nil) + _, err := f.w.Write(nil) if err != nil { return err } diff --git a/pkg/storage/cloud/BUILD.bazel b/pkg/storage/cloud/BUILD.bazel index 5ec17d7b8fdc..fcce5c893252 100644 --- a/pkg/storage/cloud/BUILD.bazel +++ b/pkg/storage/cloud/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/sqlutil", + "//pkg/util/ctxgroup", "//pkg/util/log", "//pkg/util/retry", "//pkg/util/sysutil", diff --git a/pkg/storage/cloud/amazon/s3_storage.go b/pkg/storage/cloud/amazon/s3_storage.go index a0bcdfdd0877..cb6e763a1e04 100644 --- a/pkg/storage/cloud/amazon/s3_storage.go +++ b/pkg/storage/cloud/amazon/s3_storage.go @@ -244,7 +244,7 @@ func s3ErrDelay(err error) time.Duration { return 0 } -func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) { +func (s *s3Storage) newSession(ctx context.Context) (*session.Session, error) { sess, err := session.NewSessionWithOptions(s.opts) if err != nil { return nil, errors.Wrap(err, "new aws session") @@ -259,6 +259,14 @@ func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) { } } sess.Config.Region = aws.String(s.conf.Region) + return sess, err +} + +func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) { + sess, err := s.newSession(ctx) + if err != nil { + return nil, err + } return s3.New(sess), nil } @@ -277,39 +285,26 @@ func (s *s3Storage) Settings() *cluster.Settings { return s.settings } -func (s *s3Storage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - client, err := s.newS3Client(ctx) +func (s *s3Storage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + sess, err := s.newSession(ctx) if err != nil { - return err + return nil, err } - err = contextutil.RunWithTimeout(ctx, "put s3 object", - cloud.Timeout.Get(&s.settings.SV), - func(ctx context.Context) error { - putObjectInput := s3.PutObjectInput{ - Bucket: s.bucket, - Key: aws.String(path.Join(s.prefix, basename)), - Body: content, - } - - // If a server side encryption mode is provided in the URI, we must set - // the header values to enable SSE before writing the file to the s3 - // bucket. - if s.conf.ServerEncMode != "" { - switch s.conf.ServerEncMode { - case string(aes256Enc): - putObjectInput.SetServerSideEncryption(s.conf.ServerEncMode) - case string(kmsEnc): - putObjectInput.SetServerSideEncryption(s.conf.ServerEncMode) - putObjectInput.SetSSEKMSKeyId(s.conf.ServerKMSID) - default: - return errors.Newf("unsupported server encryption mode %s. "+ - "Supported values are `aws:kms` and `AES256`.", s.conf.ServerEncMode) - } - } - _, err := client.PutObjectWithContext(ctx, &putObjectInput) - return err + uploader := s3manager.NewUploader(sess) + + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + // Upload the file to S3. + _, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{ + Bucket: s.bucket, + Key: aws.String(path.Join(s.prefix, basename)), + Body: r, + ServerSideEncryption: nilIfEmpty(s.conf.ServerEncMode), + SSEKMSKeyId: nilIfEmpty(s.conf.ServerKMSID), }) - return errors.Wrap(err, "failed to put s3 object") + return errors.Wrap(err, "upload failed") + }), nil } func (s *s3Storage) openStreamAt( @@ -481,6 +476,13 @@ func (s *s3Storage) Close() error { return nil } +func nilIfEmpty(s string) *string { + if s == "" { + return nil + } + return aws.String(s) +} + func init() { cloud.RegisterExternalStorageProvider(roachpb.ExternalStorageProvider_s3, parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3") diff --git a/pkg/storage/cloud/azure/azure_storage.go b/pkg/storage/cloud/azure/azure_storage.go index e7f9629cd090..04f760c0e847 100644 --- a/pkg/storage/cloud/azure/azure_storage.go +++ b/pkg/storage/cloud/azure/azure_storage.go @@ -125,19 +125,18 @@ func (s *azureStorage) Settings() *cluster.Settings { return s.settings } -func (s *azureStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - err := contextutil.RunWithTimeout(ctx, "write azure file", cloud.Timeout.Get(&s.settings.SV), - func(ctx context.Context) error { - blob := s.getBlob(basename) - _, err := blob.Upload( - ctx, content, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, - azblob.DefaultAccessTier, nil /* blobTagsMap */, azblob.ClientProvidedKeyOptions{}, - ) - return err - }) - return errors.Wrapf(err, "write file: %s", basename) +func (s *azureStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + blob := s.getBlob(basename) + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + _, err := azblob.UploadStreamToBlockBlob( + ctx, r, blob, azblob.UploadStreamToBlockBlobOptions{ + BufferSize: 4 << 20, + }, + ) + return err + }), nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/storage/cloud/cloud_io.go b/pkg/storage/cloud/cloud_io.go index c627e55cd924..594be3f6d6ee 100644 --- a/pkg/storage/cloud/cloud_io.go +++ b/pkg/storage/cloud/cloud_io.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/sysutil" @@ -252,8 +253,58 @@ func CheckHTTPContentRangeHeader(h string, pos int64) (int64, error) { return size, nil } +// BackgroundPipe is a helper for providing a Writer that is backed by a pipe +// that has a background process reading from it. It *must* be Closed(). +func BackgroundPipe( + ctx context.Context, fn func(ctx context.Context, pr io.Reader) error, +) WriteCloserWithError { + pr, pw := io.Pipe() + w := &backroundPipe{w: pw, grp: ctxgroup.WithContext(ctx)} + w.grp.GoCtx(func(ctc context.Context) error { + err := fn(ctx, pr) + if err != nil { + pr.CloseWithError(err) + } else { + err = pr.Close() + } + return err + }) + return w +} + +type backroundPipe struct { + w *io.PipeWriter + grp ctxgroup.Group +} + +// Write writes to the writer. +func (s *backroundPipe) Write(p []byte) (int, error) { + return s.w.Write(p) +} + +// Close closes the writer, finishing the write operation. +func (s *backroundPipe) Close() error { + err := s.w.Close() + return errors.CombineErrors(err, s.grp.Wait()) +} + +// CloseWithError closes the Writer with an error, which may discard prior +// writes and abort the overall write operation. +func (s *backroundPipe) CloseWithError(err error) error { + e := s.w.CloseWithError(err) + return errors.CombineErrors(e, s.grp.Wait()) +} + // WriteFile is a helper for writing the content of a Reader to the given path // of an ExternalStorage. func WriteFile(ctx context.Context, basename string, src io.Reader, dest ExternalStorage) error { - return dest.WriteFile(ctx, basename, src) + w, err := dest.Writer(ctx, basename) + if err != nil { + return errors.Wrap(err, "opening object for writing") + } + if _, err := io.Copy(w, src); err != nil { + _ = w.CloseWithError(err) + return err + } + return errors.Wrap(w.Close(), "closing object") } diff --git a/pkg/storage/cloud/external_storage.go b/pkg/storage/cloud/external_storage.go index 9c429fdbab03..35788d5b0039 100644 --- a/pkg/storage/cloud/external_storage.go +++ b/pkg/storage/cloud/external_storage.go @@ -29,6 +29,14 @@ import ( // This file is for interfaces only and should not contain any implementation // code. All concrete implementations should be added to pkg/storage/cloudimpl. +// WriteCloserWithError extends WriteCloser with an extra CloseWithError func. +type WriteCloserWithError interface { + io.WriteCloser + // CloseWithError closes the writer with an error, which may choose to abort + // rather than complete any write operations. + CloseWithError(error) error +} + // ExternalStorage provides an API to read and write files in some storage, // namely various cloud storage providers, for example to store backups. // Generally an implementation is instantiated pointing to some base path or @@ -65,8 +73,10 @@ type ExternalStorage interface { // This can be leveraged for an existence check. ReadFileAt(ctx context.Context, basename string, offset int64) (io.ReadCloser, int64, error) - // WriteFile should write the content to requested name. - WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error + // Writer returns a writer for the requested name. The returned writer must + // be closed by the caller to complete the write, or can be closed with an + // error which may, depending on the implementation, abort the write. + Writer(ctx context.Context, basename string) (WriteCloserWithError, error) // ListFiles returns files that match a globs-style pattern. The returned // results are usually relative to the base path, meaning an ExternalStorage diff --git a/pkg/storage/cloud/gcp/BUILD.bazel b/pkg/storage/cloud/gcp/BUILD.bazel index f8172904c808..68d64a490a04 100644 --- a/pkg/storage/cloud/gcp/BUILD.bazel +++ b/pkg/storage/cloud/gcp/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "//pkg/settings/cluster", "//pkg/storage/cloud", "//pkg/util/contextutil", - "//pkg/util/retry", "@com_github_cockroachdb_errors//:errors", "@com_google_cloud_go_storage//:storage", "@org_golang_google_api//iterator", diff --git a/pkg/storage/cloud/gcp/gcs_storage.go b/pkg/storage/cloud/gcp/gcs_storage.go index 306e6cd65f4c..5375c05ce433 100644 --- a/pkg/storage/cloud/gcp/gcs_storage.go +++ b/pkg/storage/cloud/gcp/gcs_storage.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/util/contextutil" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" "golang.org/x/oauth2/google" "google.golang.org/api/iterator" @@ -157,24 +156,11 @@ func makeGCSStorage( }, nil } -func (g *gcsStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - const maxAttempts = 3 - err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error { - if _, err := content.Seek(0, io.SeekStart); err != nil { - return err - } - // Set the timeout within the retry loop. - return contextutil.RunWithTimeout(ctx, "put gcs file", cloud.Timeout.Get(&g.settings.SV), - func(ctx context.Context) error { - w := g.bucket.Object(path.Join(g.prefix, basename)).NewWriter(ctx) - if _, err := io.Copy(w, content); err != nil { - _ = w.Close() - return err - } - return w.Close() - }) - }) - return errors.Wrap(err, "write to google cloud") +func (g *gcsStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + w := g.bucket.Object(path.Join(g.prefix, basename)).NewWriter(ctx) + return w, nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/storage/cloud/httpsink/http_storage.go b/pkg/storage/cloud/httpsink/http_storage.go index ff160f0701d8..01bcddb90825 100644 --- a/pkg/storage/cloud/httpsink/http_storage.go +++ b/pkg/storage/cloud/httpsink/http_storage.go @@ -175,12 +175,13 @@ func (h *httpStorage) ReadFileAt( return stream.Body, size, nil } -func (h *httpStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error { - return contextutil.RunWithTimeout(ctx, fmt.Sprintf("PUT %s", basename), - cloud.Timeout.Get(&h.settings.SV), func(ctx context.Context) error { - _, err := h.reqNoBody(ctx, "PUT", basename, content) - return err - }) +func (h *httpStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + _, err := h.reqNoBody(ctx, "PUT", basename, r) + return err + }), nil } func (h *httpStorage) ListFiles(_ context.Context, _ string) ([]string, error) { diff --git a/pkg/storage/cloud/nodelocal/nodelocal_storage.go b/pkg/storage/cloud/nodelocal/nodelocal_storage.go index 1a02ae430dfd..27daa2241b0c 100644 --- a/pkg/storage/cloud/nodelocal/nodelocal_storage.go +++ b/pkg/storage/cloud/nodelocal/nodelocal_storage.go @@ -128,10 +128,13 @@ func joinRelativePath(filePath string, file string) string { return path.Join(".", filePath, file) } -func (l *localFileStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { - return l.blobClient.WriteFile(ctx, joinRelativePath(l.base, basename), content) +func (l *localFileStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { + // TODO(dt): don't need this pipe. + return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error { + return l.blobClient.WriteFile(ctx, joinRelativePath(l.base, basename), r) + }), nil } // ReadFile is shorthand for ReadFileAt with offset 0. diff --git a/pkg/storage/cloud/nullsink/nullsink_storage.go b/pkg/storage/cloud/nullsink/nullsink_storage.go index 97fbb3a6015a..a8c3a398967f 100644 --- a/pkg/storage/cloud/nullsink/nullsink_storage.go +++ b/pkg/storage/cloud/nullsink/nullsink_storage.go @@ -14,7 +14,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "net/url" "github.com/cockroachdb/cockroach/pkg/base" @@ -36,6 +35,8 @@ func MakeNullSinkStorageURI(path string) string { type nullSinkStorage struct { } +var _ cloud.ExternalStorage = &nullSinkStorage{} + func makeNullSinkStorage( _ context.Context, _ cloud.ExternalStorageContext, _ roachpb.ExternalStorage, ) (cloud.ExternalStorage, error) { @@ -70,9 +71,14 @@ func (n *nullSinkStorage) ReadFileAt( return nil, 0, io.EOF } -func (n *nullSinkStorage) WriteFile(_ context.Context, _ string, content io.ReadSeeker) error { - _, err := io.Copy(ioutil.Discard, content) - return err +type nullWriter struct{} + +func (nullWriter) Write(p []byte) (int, error) { return len(p), nil } +func (nullWriter) Close() error { return nil } +func (nullWriter) CloseWithError(_ error) error { return nil } + +func (n *nullSinkStorage) Writer(_ context.Context, _ string) (cloud.WriteCloserWithError, error) { + return nullWriter{}, nil } func (n *nullSinkStorage) ListFiles(_ context.Context, _ string) ([]string, error) { diff --git a/pkg/storage/cloud/userfile/file_table_storage.go b/pkg/storage/cloud/userfile/file_table_storage.go index 43b8d95e3e44..276afec2ed1a 100644 --- a/pkg/storage/cloud/userfile/file_table_storage.go +++ b/pkg/storage/cloud/userfile/file_table_storage.go @@ -239,53 +239,23 @@ func (f *fileTableStorage) ReadFileAt( return reader, size, err } -// WriteFile implements the ExternalStorage interface and writes the file to the +// Writer implements the ExternalStorage interface and writes the file to the // user scoped FileToTableSystem. -func (f *fileTableStorage) WriteFile( - ctx context.Context, basename string, content io.ReadSeeker, -) error { +func (f *fileTableStorage) Writer( + ctx context.Context, basename string, +) (cloud.WriteCloserWithError, error) { filepath, err := checkBaseAndJoinFilePath(f.prefix, basename) if err != nil { - return err + return nil, err } // This is only possible if the method is invoked by a SQLConnFileTableStorage // which should never be the case. if f.ie == nil { - return errors.New("cannot WriteFile without a configured internal executor") - } - - defer func() { - _, _ = f.ie.Exec(ctx, "userfile-write-file-commit", nil /* txn */, `COMMIT`) - }() - - // We open an explicit txn within which we will write the file metadata entry - // and payload chunks to the userfile tables. We cannot perform these - // operations within a db.Txn retry loop because when coming from the - // copyMachine (which backs the userfile CLI upload command), we do not have - // access to all the file data at once. As a result of which, if a txn were to - // retry we are not able to seek to the start of `content` and try again, - // resulting in bytes being missed across txn retry attempts. - // See chunkWriter.WriteFile for more information about writing semantics. - _, err = f.ie.Exec(ctx, "userfile-write-file-txn", nil /* txn */, `BEGIN`) - if err != nil { - return err - } - - writer, err := f.fs.NewFileWriter(ctx, filepath, filetable.ChunkDefaultSize) - if err != nil { - return err - } - - if _, err = io.Copy(writer, content); err != nil { - return errors.Wrap(err, "failed to write using the FileTable writer") - } - - if err := writer.Close(); err != nil { - return errors.Wrap(err, "failed to close the FileTable writer") + return nil, errors.New("cannot Write without a configured internal executor") } - return err + return f.fs.NewFileWriter(ctx, filepath, filetable.ChunkDefaultSize) } // getPrefixAndPattern takes a prefix and optionally suffix of a path pattern diff --git a/pkg/storage/cloudimpl/filetable/file_table_read_writer.go b/pkg/storage/cloudimpl/filetable/file_table_read_writer.go index 73d0955666e0..0f7f07de9968 100644 --- a/pkg/storage/cloudimpl/filetable/file_table_read_writer.go +++ b/pkg/storage/cloudimpl/filetable/file_table_read_writer.go @@ -597,6 +597,10 @@ func (w *chunkWriter) Write(buf []byte) (int, error) { return bufLen, nil } +func (w *chunkWriter) CloseWithError(err error) error { + return w.Close() +} + // Close implements the io.Closer interface by flushing the underlying writer // thereby writing remaining data to the Payload table. It also updates the file // metadata entry in the File table with the number of bytes written. @@ -918,7 +922,7 @@ users WHERE NOT "username" = 'root' AND NOT "username" = 'admin' AND NOT "userna // the last chunk and commit the txn within which all writes occur. func (f *FileToTableSystem) NewFileWriter( ctx context.Context, filename string, chunkSize int, -) (io.WriteCloser, error) { +) (cloud.WriteCloserWithError, error) { e, err := resolveInternalFileToTableExecutor(f.executor) if err != nil { return nil, err