Skip to content

Commit

Permalink
Merge #65057 #65110
Browse files Browse the repository at this point in the history
65057: storage/cloud: replace WriteFile(Reader) with Writer r=dt a=dt

This changes the ExternalStorage API's writing method from WriteFile which takes an io.Reader
and writes its content to the requested destination or returns an error encountered while doing
so to instead have Writer() which returns an io.Writer pointing to the destination that can be
written to later and then closed to finish the upload (or CloseWithError'ed to cancel it).

All existing callers use the shim and are unaffected, but can later choose to change to a
push-based model and use the Writer directly. This is left to a follow-up change.

(note: first commit is just adding a shim for existing callers and switching them)

Release note: none.

65110: kv/kvserver: skip TestRejectedLeaseDoesntDictateClosedTimestamp r=RaduBerinde a=RaduBerinde

Refs: #65109

Reason: flaky test

Generated by bin/skip-test.

Release justification: non-production code changes

Release note: None

Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
3 people committed May 13, 2021
3 parents 5817ba2 + ac52981 + cb6934e commit 964a2ad
Show file tree
Hide file tree
Showing 31 changed files with 220 additions and 183 deletions.
8 changes: 3 additions & 5 deletions pkg/blobs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BlobClient interface {
// WriteFile sends the named payload to the requested node.
// This method will read entire content of file and send
// it over to another node, based on the nodeID.
WriteFile(ctx context.Context, file string, content io.ReadSeeker) error
WriteFile(ctx context.Context, file string, content io.Reader) error

// List lists the corresponding filenames from the requested node.
// The requested node can be the current node.
Expand Down Expand Up @@ -75,9 +75,7 @@ func (c *remoteClient) ReadFile(
return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file")
}

func (c *remoteClient) WriteFile(
ctx context.Context, file string, content io.ReadSeeker,
) (err error) {
func (c *remoteClient) WriteFile(ctx context.Context, file string, content io.Reader) (err error) {
ctx = metadata.AppendToOutgoingContext(ctx, "filename", file)
stream, err := c.blobClient.PutStream(ctx)
if err != nil {
Expand Down Expand Up @@ -143,7 +141,7 @@ func (c *localClient) ReadFile(
return c.localStorage.ReadFile(file, offset)
}

func (c *localClient) WriteFile(ctx context.Context, file string, content io.ReadSeeker) error {
func (c *localClient) WriteFile(ctx context.Context, file string, content io.Reader) error {
return c.localStorage.WriteFile(file, content)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
storage, err := externalStorageFromURI(ctx, uri, security.RootUserName())
defer storage.Close()
require.NoError(t, err)
require.NoError(t, storage.WriteFile(ctx, backupManifestName, emptyReader))
require.NoError(t, cloud.WriteFile(ctx, storage, backupManifestName, emptyReader))
}

// writeLatest writes latestBackupSuffix to the LATEST file in the given
Expand All @@ -72,7 +72,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) {
storage, err := externalStorageFromURI(ctx, collectionURI, security.RootUserName())
defer storage.Close()
require.NoError(t, err)
require.NoError(t, storage.WriteFile(ctx, latestFileName, bytes.NewReader([]byte(latestBackupSuffix))))
require.NoError(t, cloud.WriteFile(ctx, storage, latestFileName, bytes.NewReader([]byte(latestBackupSuffix))))
}

// localizeURI returns a slice of just the base URI if localities is nil.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}
defer c.Close()
if err := c.WriteFile(ctx, latestFileName, strings.NewReader(suffix)); err != nil {
if err := cloud.WriteFile(ctx, c, latestFileName, strings.NewReader(suffix)); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func writeFile(
exportStore = localitySpecificStore
}

if err := exportStore.WriteFile(ctx, file.Path, bytes.NewReader(data)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, file.Path, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
return errors.Wrap(err, "writing SST")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/backupccl/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func writeBackupManifest(
}
}

if err := exportStore.WriteFile(ctx, filename, bytes.NewReader(descBuf)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(descBuf)); err != nil {
return err
}

Expand All @@ -400,7 +400,7 @@ func writeBackupManifest(
if err != nil {
return errors.Wrap(err, "calculating checksum")
}
if err := exportStore.WriteFile(ctx, filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil {
return errors.Wrap(err, "writing manifest checksum")
}

Expand Down Expand Up @@ -487,7 +487,7 @@ func writeBackupPartitionDescriptor(
}
}

return exportStore.WriteFile(ctx, filename, bytes.NewReader(descBuf))
return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(descBuf))
}

// writeTableStatistics writes a StatsTable object to a file of the filename
Expand Down Expand Up @@ -515,7 +515,7 @@ func writeTableStatistics(
return err
}
}
return exportStore.WriteFile(ctx, filename, bytes.NewReader(statsBuf))
return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(statsBuf))
}

func loadBackupManifests(
Expand Down Expand Up @@ -951,7 +951,7 @@ func writeEncryptionInfoIfNotExists(
if err != nil {
return err
}
if err := dest.WriteFile(ctx, backupEncryptionInfoFile, bytes.NewReader(buf)); err != nil {
if err := cloud.WriteFile(ctx, dest, backupEncryptionInfoFile, bytes.NewReader(buf)); err != nil {
return err
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *cloudStorageSink) EmitResolvedTimestamp(
if log.V(1) {
log.Infof(ctx, "writing file %s %s", filename, resolved.AsOfSystemTime())
}
return s.es.WriteFile(ctx, filepath.Join(part, filename), bytes.NewReader(payload))
return cloud.WriteFile(ctx, s.es, filepath.Join(part, filename), bytes.NewReader(payload))
}

// flushTopicVersions flushes all open files for the provided topic up to and
Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
"precedes a file emitted before: %s", filename, s.prevFilename)
}
s.prevFilename = filename
if err := s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func showData(
if err != nil {
return errors.Wrapf(err, "unable to open store to write files: %s", debugBackupArgs.destination)
}
if err = store.WriteFile(ctx, file, bytes.NewReader(buf.Bytes())); err != nil {
if err = cloud.WriteFile(ctx, store, file, bytes.NewReader(buf.Bytes())); err != nil {
_ = store.Close()
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (sp *csvWriter) Run(ctx context.Context) {

size := writer.Len()

if err := es.WriteFile(ctx, filename, bytes.NewReader(writer.Bytes())); err != nil {
if err := cloud.WriteFile(ctx, es, filename, bytes.NewReader(writer.Bytes())); err != nil {
return err
}
res := rowenc.EncDatumRow{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ func (u *unsupportedStmtLogger) flush() error {
} else {
logFileName = path.Join(logFileName, pgDumpUnsupportedSchemaStmtLog, fmt.Sprintf("%d.log", u.flushCount))
}
err = s.WriteFile(u.ctx, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
err = cloud.WriteFile(u.ctx, s, logFileName, bytes.NewReader(u.logBuffer.Bytes()))
if err != nil {
return errors.Wrap(err, "failed to log unsupported stmts to log during IMPORT PGDUMP")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,7 +2286,7 @@ b STRING) CSV DATA (%s)`, testFiles.files[0])); err != nil {
require.NoError(t, err)

data := []byte("1,2")
require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data)))
require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data)))

sqlDB.Exec(t, fmt.Sprintf("IMPORT TABLE foo (id INT PRIMARY KEY, "+
"id2 INT) CSV DATA ('%s')", userfileURI))
Expand All @@ -2302,7 +2302,7 @@ b STRING) CSV DATA (%s)`, testFiles.files[0])); err != nil {
require.NoError(t, err)

data := []byte("1,2")
require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data)))
require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data)))

sqlDB.Exec(t, fmt.Sprintf("IMPORT TABLE baz (id INT PRIMARY KEY, "+
"id2 INT) CSV DATA ('%s')", userfileURI))
Expand Down Expand Up @@ -2412,7 +2412,7 @@ func TestImportObjectLevelRBAC(t *testing.T) {
fileTableSystem1, err := cloud.ExternalStorageFromURI(ctx, dest, base.ExternalIODirConfig{},
cluster.NoSettings, blobs.TestEmptyBlobClientFactory, security.TestUserName(), ie, tc.Server(0).DB())
require.NoError(t, err)
require.NoError(t, fileTableSystem1.WriteFile(ctx, filename, bytes.NewReader([]byte("1,aaa"))))
require.NoError(t, cloud.WriteFile(ctx, fileTableSystem1, filename, bytes.NewReader([]byte("1,aaa"))))
}

t.Run("import-RBAC", func(t *testing.T) {
Expand Down Expand Up @@ -3440,7 +3440,7 @@ func TestImportIntoCSV(t *testing.T) {
require.NoError(t, err)

data := []byte("1,2")
require.NoError(t, userfileStorage.WriteFile(ctx, "", bytes.NewReader(data)))
require.NoError(t, cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(data)))

sqlDB.Exec(t, "CREATE TABLE foo (id INT PRIMARY KEY, id2 INT)")
sqlDB.Exec(t, fmt.Sprintf("IMPORT INTO foo (id, id2) CSV DATA ('%s')", userfileURI))
Expand Down Expand Up @@ -3507,7 +3507,7 @@ func benchUserUpload(b *testing.B, uploadBaseURI string) {
require.NoError(b, err)
content, err := ioutil.ReadAll(r)
require.NoError(b, err)
err = userfileStorage.WriteFile(ctx, "", bytes.NewReader(content))
err = cloud.WriteFile(ctx, userfileStorage, "", bytes.NewReader(content))
require.NoError(b, err)
numBytes = int64(len(content))
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func readInputFiles(
return err
}
defer rejectedStorage.Close()
if err := rejectedStorage.WriteFile(ctx, "", bytes.NewReader(buf)); err != nil {
if err := cloud.WriteFile(ctx, rejectedStorage, "", bytes.NewReader(buf)); err != nil {
return err
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ func (es *generatorExternalStorage) Size(ctx context.Context, basename string) (
return int64(es.gen.size), nil
}

func (es *generatorExternalStorage) WriteFile(
ctx context.Context, basename string, content io.ReadSeeker,
) error {
return errors.New("unsupported")
func (es *generatorExternalStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
return nil, errors.New("unsupported")
}

func (es *generatorExternalStorage) ListFiles(ctx context.Context, _ string) ([]string, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func evalExport(
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxUploadRetries, func() error {
// We blindly retry any error here because we expect the caller to have
// verified the target is writable before sending ExportRequests for it.
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
if err := cloud.WriteFile(ctx, exportStore, exported.Path, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -449,6 +450,7 @@ func TestBumpSideTransportClosed(t *testing.T) {
// 6, the lease proposal doesn't bump the assignedClosedTimestamp.
func TestRejectedLeaseDoesntDictateClosedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 65109, "flaky test")
defer log.Scope(t).Close(t)
ctx := context.Background()

Expand Down
41 changes: 19 additions & 22 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"io"
"net/url"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
)
Expand All @@ -41,8 +41,7 @@ var _ copyMachineInterface = &fileUploadMachine{}

type fileUploadMachine struct {
c *copyMachine
writeToFile *io.PipeWriter
wg *sync.WaitGroup
w cloud.WriteCloserWithError
failureCleanup func()
}

Expand Down Expand Up @@ -93,8 +92,7 @@ func newFileUploadMachine(
p: planner{execCfg: execCfg, alloc: &rowenc.DatumAlloc{}},
}
f = &fileUploadMachine{
c: c,
wg: &sync.WaitGroup{},
c: c,
}

// We need a planner to do the initial planning, even if a planner
Expand Down Expand Up @@ -123,7 +121,6 @@ func newFileUploadMachine(
return nil, err
}

pr, pw := io.Pipe()
store, err := c.p.execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, dest, c.p.User())
if err != nil {
return nil, err
Expand All @@ -134,15 +131,11 @@ func newFileUploadMachine(
return nil, err
}

f.wg.Add(1)
go func() {
err := store.WriteFile(ctx, "", &noopReadSeeker{pr})
if err != nil {
_ = pr.CloseWithError(err)
}
f.wg.Done()
}()
f.writeToFile = pw
f.w, err = store.Writer(ctx, "")
if err != nil {
return nil, err
}

f.failureCleanup = func() {
// Ignoring this error because deletion would only fail
// if the file was not created in the first place.
Expand Down Expand Up @@ -173,20 +166,24 @@ func CopyInFileStmt(destination, schema, table string) string {
)
}

func (f *fileUploadMachine) run(ctx context.Context) (err error) {
err = f.c.run(ctx)
_ = f.writeToFile.Close()
func (f *fileUploadMachine) run(ctx context.Context) error {
err := f.c.run(ctx)
if err != nil {
err = errors.CombineErrors(err, f.w.CloseWithError(err))
} else {
err = f.w.Close()
}

if err != nil {
f.failureCleanup()
}
f.wg.Wait()
return
return err
}

func (f *fileUploadMachine) writeFile(ctx context.Context) error {
for _, r := range f.c.rows {
b := []byte(*r[0].(*tree.DBytes))
n, err := f.writeToFile.Write(b)
n, err := f.w.Write(b)
if err != nil {
return err
}
Expand All @@ -196,7 +193,7 @@ func (f *fileUploadMachine) writeFile(ctx context.Context) error {
}

// Issue a final zero-byte write to ensure we observe any errors in the pipe.
_, err := f.writeToFile.Write(nil)
_, err := f.w.Write(nil)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlutil",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/sysutil",
Expand Down
Loading

0 comments on commit 964a2ad

Please sign in to comment.