Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/cloud: replace WriteFile(Reader) with Writer #65057

Merged
merged 2 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions pkg/blobs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BlobClient interface {
// WriteFile sends the named payload to the requested node.
// This method will read entire content of file and send
// it over to another node, based on the nodeID.
WriteFile(ctx context.Context, file string, content io.ReadSeeker) error
WriteFile(ctx context.Context, file string, content io.Reader) error

// List lists the corresponding filenames from the requested node.
// The requested node can be the current node.
Expand Down Expand Up @@ -75,9 +75,7 @@ func (c *remoteClient) ReadFile(
return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file")
}

func (c *remoteClient) WriteFile(
ctx context.Context, file string, content io.ReadSeeker,
) (err error) {
func (c *remoteClient) WriteFile(ctx context.Context, file string, content io.Reader) (err error) {
ctx = metadata.AppendToOutgoingContext(ctx, "filename", file)
stream, err := c.blobClient.PutStream(ctx)
if err != nil {
Expand Down Expand Up @@ -143,7 +141,7 @@ func (c *localClient) ReadFile(
return c.localStorage.ReadFile(file, offset)
}

func (c *localClient) WriteFile(ctx context.Context, file string, content io.ReadSeeker) error {
func (c *localClient) WriteFile(ctx context.Context, file string, content io.Reader) error {
return c.localStorage.WriteFile(file, content)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
storage, err := externalStorageFromURI(ctx, uri, security.RootUserName())
defer storage.Close()
require.NoError(t, err)
require.NoError(t, storage.WriteFile(ctx, backupManifestName, emptyReader))
require.NoError(t, cloud.WriteFile(ctx, storage, backupManifestName, emptyReader))
}

// writeLatest writes latestBackupSuffix to the LATEST file in the given
Expand All @@ -72,7 +72,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
storage, err := externalStorageFromURI(ctx, collectionURI, security.RootUserName())
defer storage.Close()
require.NoError(t, err)
require.NoError(t, storage.WriteFile(ctx, latestFileName, bytes.NewReader([]byte(latestBackupSuffix))))
require.NoError(t, cloud.WriteFile(ctx, storage, latestFileName, bytes.NewReader([]byte(latestBackupSuffix))))
}

// localizeURI returns a slice of just the base URI if localities is nil.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}
defer c.Close()
if err := c.WriteFile(ctx, latestFileName, strings.NewReader(suffix)); err != nil {
if err := cloud.WriteFile(ctx, c, latestFileName, strings.NewReader(suffix)); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func writeFile(
exportStore = localitySpecificStore
}

if err := exportStore.WriteFile(ctx, file.Path, bytes.NewReader(data)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, file.Path, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
return errors.Wrap(err, "writing SST")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func writeBackupManifest(
}
}

if err := exportStore.WriteFile(ctx, filename, bytes.NewReader(descBuf)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(descBuf)); err != nil {
return err
}

Expand All @@ -400,7 +400,7 @@ func writeBackupManifest(
if err != nil {
return errors.Wrap(err, "calculating checksum")
}
if err := exportStore.WriteFile(ctx, filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil {
return errors.Wrap(err, "writing manifest checksum")
}

Expand Down Expand Up @@ -487,7 +487,7 @@ func writeBackupPartitionDescriptor(
}
}

return exportStore.WriteFile(ctx, filename, bytes.NewReader(descBuf))
return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(descBuf))
}

// writeTableStatistics writes a StatsTable object to a file of the filename
Expand Down Expand Up @@ -515,7 +515,7 @@ func writeTableStatistics(
return err
}
}
return exportStore.WriteFile(ctx, filename, bytes.NewReader(statsBuf))
return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(statsBuf))
}

func loadBackupManifests(
Expand Down Expand Up @@ -951,7 +951,7 @@ func writeEncryptionInfoIfNotExists(
if err != nil {
return err
}
if err := dest.WriteFile(ctx, backupEncryptionInfoFile, bytes.NewReader(buf)); err != nil {
if err := cloud.WriteFile(ctx, dest, backupEncryptionInfoFile, bytes.NewReader(buf)); err != nil {
return err
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *cloudStorageSink) EmitResolvedTimestamp(
if log.V(1) {
log.Infof(ctx, "writing file %s %s", filename, resolved.AsOfSystemTime())
}
return s.es.WriteFile(ctx, filepath.Join(part, filename), bytes.NewReader(payload))
return cloud.WriteFile(ctx, s.es, filepath.Join(part, filename), bytes.NewReader(payload))
}

// flushTopicVersions flushes all open files for the provided topic up to and
Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
"precedes a file emitted before: %s", filename, s.prevFilename)
}
s.prevFilename = filename
if err := s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func showData(
if err != nil {
return errors.Wrapf(err, "unable to open store to write files: %s", debugBackupArgs.destination)
}
if err = store.WriteFile(ctx, file, bytes.NewReader(buf.Bytes())); err != nil {
if err = cloud.WriteFile(ctx, store, file, bytes.NewReader(buf.Bytes())); err != nil {
_ = store.Close()
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (sp *csvWriter) Run(ctx context.Context) {

size := writer.Len()

if err := es.WriteFile(ctx, filename, bytes.NewReader(writer.Bytes())); err != nil {
if err := cloud.WriteFile(ctx, es, filename, bytes.NewReader(writer.Bytes())); err != nil {
return err
}
res := rowenc.EncDatumRow{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ func (u *unsupportedStmtLogger) flush() error {
} else {
logFileName = path.Join(logFileName, pgDumpUnsupportedSchemaStmtLog, fmt.Sprintf("%d.log", u.flushCount))
}
err = s.WriteFile(u.ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
err = cloud.WriteFile(u.ctx, s, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,7 +2286,7 @@ b STRING) CSV DATA (%s)`, testFiles.files[0])); err != nil {
require.NoError(t, err)

data := []byte("1,2")
require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data)))
require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data)))

sqlDB.Exec(t, fmt.Sprintf("IMPORT TABLE foo (id INT PRIMARY KEY, "+
"id2 INT) CSV DATA ('%s')", userfileURI))
Expand All @@ -2302,7 +2302,7 @@ b STRING) CSV DATA (%s)`, testFiles.files[0])); err != nil {
require.NoError(t, err)

data := []byte("1,2")
require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data)))
require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data)))

sqlDB.Exec(t, fmt.Sprintf("IMPORT TABLE baz (id INT PRIMARY KEY, "+
"id2 INT) CSV DATA ('%s')", userfileURI))
Expand Down Expand Up @@ -2412,7 +2412,7 @@ func TestImportObjectLevelRBAC(t *testing.T) {
fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{},
cluster.NoSettings, blobs.TestEmptyBlobClientFactory, security.TestUserName(), ie, tc.Server(0).DB())
require.NoError(t, err)
require.NoError(t, fileTableSystem1.WriteFile(ctx, filename, bytes.NewReader([]byte("1,aaa"))))
require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte("1,aaa"))))
}

t.Run("import-RBAC", func(t *testing.T) {
Expand Down Expand Up @@ -3440,7 +3440,7 @@ func TestImportIntoCSV(t *testing.T) {
require.NoError(t, err)

data := []byte("1,2")
require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data)))
require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data)))

sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, id2 INT)")
sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO foo (id, id2) CSV DATA ('%s')", userfileURI))
Expand Down Expand Up @@ -3507,7 +3507,7 @@ func benchUserUpload(b *testing.B, uploadBaseURI string) {
require.NoError(b, err)
content, err := ioutil.ReadAll(r)
require.NoError(b, err)
err = userfileStorage.WriteFile(ctx, "", bytes.NewReader(content))
err = cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(content))
require.NoError(b, err)
numBytes = int64(len(content))
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func readInputFiles(
return err
}
defer rejectedStorage.Close()
if err := rejectedStorage.WriteFile(ctx, "", bytes.NewReader(buf)); err != nil {
if err := cloud.WriteFile(ctx, rejectedStorage, "", bytes.NewReader(buf)); err != nil {
return err
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ func (es *generatorExternalStorage) Size(ctx context.Context, basename string) (
return int64(es.gen.size), nil
}

func (es *generatorExternalStorage) WriteFile(
ctx context.Context, basename string, content io.ReadSeeker,
) error {
return errors.New("unsupported")
func (es *generatorExternalStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
return nil, errors.New("unsupported")
}

func (es *generatorExternalStorage) ListFiles(ctx context.Context, _ string) ([]string, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func evalExport(
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxUploadRetries, func() error {
// We blindly retry any error here because we expect the caller to have
// verified the target is writable before sending ExportRequests for it.
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, exported.Path, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
return err
}
Expand Down
41 changes: 19 additions & 22 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"io"
"net/url"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
)
Expand All @@ -41,8 +41,7 @@ var _ copyMachineInterface = &fileUploadMachine{}

type fileUploadMachine struct {
c *copyMachine
writeToFile *io.PipeWriter
wg *sync.WaitGroup
w cloud.WriteCloserWithError
failureCleanup func()
}

Expand Down Expand Up @@ -93,8 +92,7 @@ func newFileUploadMachine(
p: planner{execCfg: execCfg, alloc: &rowenc.DatumAlloc{}},
}
f = &fileUploadMachine{
c: c,
wg: &sync.WaitGroup{},
c: c,
}

// We need a planner to do the initial planning, even if a planner
Expand Down Expand Up @@ -123,7 +121,6 @@ func newFileUploadMachine(
return nil, err
}

pr, pw := io.Pipe()
store, err := c.p.execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, dest, c.p.User())
if err != nil {
return nil, err
Expand All @@ -134,15 +131,11 @@ func newFileUploadMachine(
return nil, err
}

f.wg.Add(1)
go func() {
err := store.WriteFile(ctx, "", &noopReadSeeker{pr})
if err != nil {
_ = pr.CloseWithError(err)
}
f.wg.Done()
}()
f.writeToFile = pw
f.w, err = store.Writer(ctx, "")
if err != nil {
return nil, err
}

f.failureCleanup = func() {
// Ignoring this error because deletion would only fail
// if the file was not created in the first place.
Expand Down Expand Up @@ -173,20 +166,24 @@ func CopyInFileStmt(destination, schema, table string) string {
)
}

func (f *fileUploadMachine) run(ctx context.Context) (err error) {
err = f.c.run(ctx)
_ = f.writeToFile.Close()
func (f *fileUploadMachine) run(ctx context.Context) error {
err := f.c.run(ctx)
if err != nil {
err = errors.CombineErrors(err, f.w.CloseWithError(err))
} else {
err = f.w.Close()
}

if err != nil {
f.failureCleanup()
}
f.wg.Wait()
return
return err
}

func (f *fileUploadMachine) writeFile(ctx context.Context) error {
for _, r := range f.c.rows {
b := []byte(*r[0].(*tree.DBytes))
n, err := f.writeToFile.Write(b)
n, err := f.w.Write(b)
if err != nil {
return err
}
Expand All @@ -196,7 +193,7 @@ func (f *fileUploadMachine) writeFile(ctx context.Context) error {
}

// Issue a final zero-byte write to ensure we observe any errors in the pipe.
_, err := f.writeToFile.Write(nil)
_, err := f.w.Write(nil)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlutil",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/sysutil",
Expand Down
Loading