Skip to content

Commit

Permalink
storage/cloud: replace WriteFile(Reader) with Writer
Browse files Browse the repository at this point in the history
This changes the ExternalStorage API's writing method from WriteFile which takes an io.Reader
and writes its content to the requested destination or returns an error encountered while doing
so to instead have Writer() which returns an io.Writer pointing to the destination that can be
written to later and then closed to finish the upload (or CloseWithError'ed to cancel it).

All existing callers use the shim and are unaffected, but can later choose to change to a
push-based model and use the Writer directly. This is left to a follow-up change.

Release note: none.
  • Loading branch information
dt committed May 13, 2021
1 parent 8b944e6 commit ac52981
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 151 deletions.
8 changes: 3 additions & 5 deletions pkg/blobs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BlobClient interface {
// WriteFile sends the named payload to the requested node.
// This method will read entire content of file and send
// it over to another node, based on the nodeID.
WriteFile(ctx context.Context, file string, content io.ReadSeeker) error
WriteFile(ctx context.Context, file string, content io.Reader) error

// List lists the corresponding filenames from the requested node.
// The requested node can be the current node.
Expand Down Expand Up @@ -75,9 +75,7 @@ func (c *remoteClient) ReadFile(
return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file")
}

func (c *remoteClient) WriteFile(
ctx context.Context, file string, content io.ReadSeeker,
) (err error) {
func (c *remoteClient) WriteFile(ctx context.Context, file string, content io.Reader) (err error) {
ctx = metadata.AppendToOutgoingContext(ctx, "filename", file)
stream, err := c.blobClient.PutStream(ctx)
if err != nil {
Expand Down Expand Up @@ -143,7 +141,7 @@ func (c *localClient) ReadFile(
return c.localStorage.ReadFile(file, offset)
}

func (c *localClient) WriteFile(ctx context.Context, file string, content io.ReadSeeker) error {
func (c *localClient) WriteFile(ctx context.Context, file string, content io.Reader) error {
return c.localStorage.WriteFile(file, content)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ func (es *generatorExternalStorage) Size(ctx context.Context, basename string) (
return int64(es.gen.size), nil
}

func (es *generatorExternalStorage) WriteFile(
ctx context.Context, basename string, content io.ReadSeeker,
) error {
return errors.New("unsupported")
func (es *generatorExternalStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
return nil, errors.New("unsupported")
}

func (es *generatorExternalStorage) ListFiles(ctx context.Context, _ string) ([]string, error) {
Expand Down
40 changes: 18 additions & 22 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"io"
"net/url"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
Expand All @@ -42,8 +41,7 @@ var _ copyMachineInterface = &fileUploadMachine{}

type fileUploadMachine struct {
c *copyMachine
writeToFile *io.PipeWriter
wg *sync.WaitGroup
w cloud.WriteCloserWithError
failureCleanup func()
}

Expand Down Expand Up @@ -94,8 +92,7 @@ func newFileUploadMachine(
p: planner{execCfg: execCfg, alloc: &rowenc.DatumAlloc{}},
}
f = &fileUploadMachine{
c: c,
wg: &sync.WaitGroup{},
c: c,
}

// We need a planner to do the initial planning, even if a planner
Expand Down Expand Up @@ -124,7 +121,6 @@ func newFileUploadMachine(
return nil, err
}

pr, pw := io.Pipe()
store, err := c.p.execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, dest, c.p.User())
if err != nil {
return nil, err
Expand All @@ -135,15 +131,11 @@ func newFileUploadMachine(
return nil, err
}

f.wg.Add(1)
go func() {
err := cloud.WriteFile(ctx, store, "", &noopReadSeeker{pr})
if err != nil {
_ = pr.CloseWithError(err)
}
f.wg.Done()
}()
f.writeToFile = pw
f.w, err = store.Writer(ctx, "")
if err != nil {
return nil, err
}

f.failureCleanup = func() {
// Ignoring this error because deletion would only fail
// if the file was not created in the first place.
Expand Down Expand Up @@ -174,20 +166,24 @@ func CopyInFileStmt(destination, schema, table string) string {
)
}

func (f *fileUploadMachine) run(ctx context.Context) (err error) {
err = f.c.run(ctx)
_ = f.writeToFile.Close()
func (f *fileUploadMachine) run(ctx context.Context) error {
err := f.c.run(ctx)
if err != nil {
err = errors.CombineErrors(err, f.w.CloseWithError(err))
} else {
err = f.w.Close()
}

if err != nil {
f.failureCleanup()
}
f.wg.Wait()
return
return err
}

func (f *fileUploadMachine) writeFile(ctx context.Context) error {
for _, r := range f.c.rows {
b := []byte(*r[0].(*tree.DBytes))
n, err := f.writeToFile.Write(b)
n, err := f.w.Write(b)
if err != nil {
return err
}
Expand All @@ -197,7 +193,7 @@ func (f *fileUploadMachine) writeFile(ctx context.Context) error {
}

// Issue a final zero-byte write to ensure we observe any errors in the pipe.
_, err := f.writeToFile.Write(nil)
_, err := f.w.Write(nil)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlutil",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/sysutil",
Expand Down
65 changes: 34 additions & 31 deletions pkg/storage/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func s3ErrDelay(err error) time.Duration {
return 0
}

func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) {
func (s *s3Storage) newSession(ctx context.Context) (*session.Session, error) {
sess, err := session.NewSessionWithOptions(s.opts)
if err != nil {
return nil, errors.Wrap(err, "new aws session")
Expand All @@ -259,6 +259,14 @@ func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) {
}
}
sess.Config.Region = aws.String(s.conf.Region)
return sess, err
}

func (s *s3Storage) newS3Client(ctx context.Context) (*s3.S3, error) {
sess, err := s.newSession(ctx)
if err != nil {
return nil, err
}
return s3.New(sess), nil
}

Expand All @@ -277,39 +285,27 @@ func (s *s3Storage) Settings() *cluster.Settings {
return s.settings
}

func (s *s3Storage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error {
client, err := s.newS3Client(ctx)
func (s *s3Storage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
sess, err := s.newSession(ctx)
if err != nil {
return err
return nil, err
}
err = contextutil.RunWithTimeout(ctx, "put s3 object",
cloud.Timeout.Get(&s.settings.SV),
func(ctx context.Context) error {
putObjectInput := s3.PutObjectInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
Body: content,
}

// If a server side encryption mode is provided in the URI, we must set
// the header values to enable SSE before writing the file to the s3
// bucket.
if s.conf.ServerEncMode != "" {
switch s.conf.ServerEncMode {
case string(aes256Enc):
putObjectInput.SetServerSideEncryption(s.conf.ServerEncMode)
case string(kmsEnc):
putObjectInput.SetServerSideEncryption(s.conf.ServerEncMode)
putObjectInput.SetSSEKMSKeyId(s.conf.ServerKMSID)
default:
return errors.Newf("unsupported server encryption mode %s. "+
"Supported values are `aws:kms` and `AES256`.", s.conf.ServerEncMode)
}
}
_, err := client.PutObjectWithContext(ctx, &putObjectInput)
return err
uploader := s3manager.NewUploader(sess)

return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
// Upload the file to S3.
// TODO(dt): test and tune the uploader parameters.
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
Body: r,
ServerSideEncryption: nilIfEmpty(s.conf.ServerEncMode),
SSEKMSKeyId: nilIfEmpty(s.conf.ServerKMSID),
})
return errors.Wrap(err, "failed to put s3 object")
return errors.Wrap(err, "upload failed")
}), nil
}

func (s *s3Storage) openStreamAt(
Expand Down Expand Up @@ -481,6 +477,13 @@ func (s *s3Storage) Close() error {
return nil
}

func nilIfEmpty(s string) *string {
if s == "" {
return nil
}
return aws.String(s)
}

func init() {
cloud.RegisterExternalStorageProvider(roachpb.ExternalStorageProvider_s3,
parseS3URL, MakeS3Storage, cloud.RedactedParams(AWSSecretParam, AWSTempTokenParam), "s3")
Expand Down
25 changes: 12 additions & 13 deletions pkg/storage/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,18 @@ func (s *azureStorage) Settings() *cluster.Settings {
return s.settings
}

func (s *azureStorage) WriteFile(
ctx context.Context, basename string, content io.ReadSeeker,
) error {
err := contextutil.RunWithTimeout(ctx, "write azure file", cloud.Timeout.Get(&s.settings.SV),
func(ctx context.Context) error {
blob := s.getBlob(basename)
_, err := blob.Upload(
ctx, content, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{},
azblob.DefaultAccessTier, nil /* blobTagsMap */, azblob.ClientProvidedKeyOptions{},
)
return err
})
return errors.Wrapf(err, "write file: %s", basename)
func (s *azureStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
blob := s.getBlob(basename)
return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
_, err := azblob.UploadStreamToBlockBlob(
ctx, r, blob, azblob.UploadStreamToBlockBlobOptions{
BufferSize: 4 << 20,
},
)
return err
}), nil
}

// ReadFile is shorthand for ReadFileAt with offset 0.
Expand Down
56 changes: 54 additions & 2 deletions pkg/storage/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand Down Expand Up @@ -252,8 +253,59 @@ func CheckHTTPContentRangeHeader(h string, pos int64) (int64, error) {
return size, nil
}

// BackgroundPipe is a helper for providing a Writer that is backed by a pipe
// that has a background process reading from it. It *must* be Closed().
func BackgroundPipe(
ctx context.Context, fn func(ctx context.Context, pr io.Reader) error,
) WriteCloserWithError {
pr, pw := io.Pipe()
w := &backgroundPipe{w: pw, grp: ctxgroup.WithContext(ctx)}
w.grp.GoCtx(func(ctc context.Context) error {
err := fn(ctx, pr)
if err != nil {
closeErr := pr.CloseWithError(err)
err = errors.CombineErrors(err, closeErr)
} else {
err = pr.Close()
}
return err
})
return w
}

type backgroundPipe struct {
w *io.PipeWriter
grp ctxgroup.Group
}

// Write writes to the writer.
func (s *backgroundPipe) Write(p []byte) (int, error) {
return s.w.Write(p)
}

// Close closes the writer, finishing the write operation.
func (s *backgroundPipe) Close() error {
err := s.w.Close()
return errors.CombineErrors(err, s.grp.Wait())
}

// CloseWithError closes the Writer with an error, which may discard prior
// writes and abort the overall write operation.
func (s *backgroundPipe) CloseWithError(err error) error {
e := s.w.CloseWithError(err)
return errors.CombineErrors(e, s.grp.Wait())
}

// WriteFile is a helper for writing the content of a Reader to the given path
// of an ExternalStorage.
func WriteFile(ctx context.Context, dest ExternalStorage, basename string, src io.ReadSeeker) error {
return dest.WriteFile(ctx, basename, src)
func WriteFile(ctx context.Context, dest ExternalStorage, basename string, src io.Reader) error {
w, err := dest.Writer(ctx, basename)
if err != nil {
return errors.Wrap(err, "opening object for writing")
}
if _, err := io.Copy(w, src); err != nil {
closeErr := w.CloseWithError(err)
return errors.CombineErrors(err, closeErr)
}
return errors.Wrap(w.Close(), "closing object")
}
14 changes: 12 additions & 2 deletions pkg/storage/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
// This file is for interfaces only and should not contain any implementation
// code. All concrete implementations should be added to pkg/storage/cloudimpl.

// WriteCloserWithError extends WriteCloser with an extra CloseWithError func.
type WriteCloserWithError interface {
io.WriteCloser
// CloseWithError closes the writer with an error, which may choose to abort
// rather than complete any write operations.
CloseWithError(error) error
}

// ExternalStorage provides an API to read and write files in some storage,
// namely various cloud storage providers, for example to store backups.
// Generally an implementation is instantiated pointing to some base path or
Expand Down Expand Up @@ -65,8 +73,10 @@ type ExternalStorage interface {
// This can be leveraged for an existence check.
ReadFileAt(ctx context.Context, basename string, offset int64) (io.ReadCloser, int64, error)

// WriteFile should write the content to requested name.
WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error
// Writer returns a writer for the requested name. The returned writer must
// be closed by the caller to complete the write, or can be closed with an
// error which may, depending on the implementation, abort the write.
Writer(ctx context.Context, basename string) (WriteCloserWithError, error)

// ListFiles returns files that match a globs-style pattern. The returned
// results are usually relative to the base path, meaning an ExternalStorage
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/cloud/gcp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
"//pkg/settings/cluster",
"//pkg/storage/cloud",
"//pkg/util/contextutil",
"//pkg/util/retry",
"@com_github_cockroachdb_errors//:errors",
"@com_google_cloud_go_storage//:storage",
"@org_golang_google_api//iterator",
Expand Down
Loading

0 comments on commit ac52981

Please sign in to comment.