Skip to content

Commit

Permalink
*: replace use context.WithTimeout with a wrapper
Browse files Browse the repository at this point in the history
This patch removes all usages of context.WithTimeout and
context.WithDeadline with contextutil.RunWithTimeout and
contextutil.RunWithDeadline.

Also, add a lint to prevent them from creeping back in.

Release note: None
  • Loading branch information
jordanlewis committed Feb 14, 2019
1 parent 986c9c8 commit 447fe7d
Show file tree
Hide file tree
Showing 20 changed files with 655 additions and 611 deletions.
10 changes: 4 additions & 6 deletions pkg/acceptance/cluster/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
Expand Down Expand Up @@ -385,16 +386,13 @@ func (cli resilientDockerClient) ContainerStart(
clientCtx context.Context, id string, opts types.ContainerStartOptions,
) error {
for {
err := func() error {
ctx, cancel := context.WithTimeout(clientCtx, 20*time.Second)
defer cancel()

err := contextutil.RunWithTimeout(clientCtx, "start container", 20*time.Second, func(ctx context.Context) error {
return cli.APIClient.ContainerStart(ctx, id, opts)
}()
})

// Keep going if ContainerStart timed out, but client's context is not
// expired.
if err == context.DeadlineExceeded && clientCtx.Err() == nil {
if errors.Cause(err) == context.DeadlineExceeded && clientCtx.Err() == nil {
log.Warningf(clientCtx, "ContainerStart timed out, retrying")
continue
}
Expand Down
151 changes: 91 additions & 60 deletions pkg/ccl/storageccl/export_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/pkg/errors"
Expand Down Expand Up @@ -442,24 +443,29 @@ func (h *httpStorage) ReadFile(ctx context.Context, basename string) (io.ReadClo
}

func (h *httpStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&h.settings.SV))
defer cancel()
_, err := h.reqNoBody(ctx, "PUT", basename, content)
return err
return contextutil.RunWithTimeout(ctx, fmt.Sprintf("PUT %s", basename),
timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error {
_, err := h.reqNoBody(ctx, "PUT", basename, content)
return err
})
}

func (h *httpStorage) Delete(ctx context.Context, basename string) error {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&h.settings.SV))
defer cancel()
_, err := h.reqNoBody(ctx, "DELETE", basename, nil)
return err
return contextutil.RunWithTimeout(ctx, fmt.Sprintf("DELETE %s", basename),
timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error {
_, err := h.reqNoBody(ctx, "DELETE", basename, nil)
return err
})
}

func (h *httpStorage) Size(ctx context.Context, basename string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&h.settings.SV))
defer cancel()
resp, err := h.reqNoBody(ctx, "HEAD", basename, nil)
if err != nil {
var resp *http.Response
if err := contextutil.RunWithTimeout(ctx, fmt.Sprintf("HEAD %s", basename),
timeoutSetting.Get(&h.settings.SV), func(ctx context.Context) error {
var err error
resp, err = h.reqNoBody(ctx, "HEAD", basename, nil)
return err
}); err != nil {
return 0, err
}
if resp.ContentLength < 0 {
Expand Down Expand Up @@ -617,13 +623,16 @@ func (s *s3Storage) Conf() roachpb.ExportStorage {
}

func (s *s3Storage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&s.settings.SV))
defer cancel()
_, err := s.s3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
Body: content,
})
err := contextutil.RunWithTimeout(ctx, "put s3 object",
timeoutSetting.Get(&s.settings.SV),
func(ctx context.Context) error {
_, err := s.s3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
Body: content,
})
return err
})
return errors.Wrap(err, "failed to put s3 object")
}

Expand All @@ -640,22 +649,29 @@ func (s *s3Storage) ReadFile(ctx context.Context, basename string) (io.ReadClose
}

func (s *s3Storage) Delete(ctx context.Context, basename string) error {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&s.settings.SV))
defer cancel()
_, err := s.s3.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
})
return err
return contextutil.RunWithTimeout(ctx, "delete s3 object",
timeoutSetting.Get(&s.settings.SV),
func(ctx context.Context) error {
_, err := s.s3.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
})
return err
})
}

func (s *s3Storage) Size(ctx context.Context, basename string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&s.settings.SV))
defer cancel()
out, err := s.s3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
})
var out *s3.HeadObjectOutput
err := contextutil.RunWithTimeout(ctx, "get s3 object header",
timeoutSetting.Get(&s.settings.SV),
func(ctx context.Context) error {
var err error
out, err = s.s3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: s.bucket,
Key: aws.String(path.Join(s.prefix, basename)),
})
return err
})
if err != nil {
return 0, errors.Wrap(err, "failed to get s3 object headers")
}
Expand Down Expand Up @@ -757,18 +773,19 @@ func makeGCSStorage(
func (g *gcsStorage) WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error {
const maxAttempts = 3
err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxAttempts, func() error {
// Set the timeout within the retry loop.
deadlineCtx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&g.settings.SV))
defer cancel()
if _, err := content.Seek(0, io.SeekStart); err != nil {
return err
}
w := g.bucket.Object(path.Join(g.prefix, basename)).NewWriter(deadlineCtx)
if _, err := io.Copy(w, content); err != nil {
_ = w.Close()
return err
}
return w.Close()
// Set the timeout within the retry loop.
return contextutil.RunWithTimeout(ctx, "put gcs file", timeoutSetting.Get(&g.settings.SV),
func(ctx context.Context) error {
w := g.bucket.Object(path.Join(g.prefix, basename)).NewWriter(ctx)
if _, err := io.Copy(w, content); err != nil {
_ = w.Close()
return err
}
return w.Close()
})
})
return errors.Wrap(err, "write to google cloud")
}
Expand All @@ -786,16 +803,22 @@ func (g *gcsStorage) ReadFile(ctx context.Context, basename string) (io.ReadClos
}

func (g *gcsStorage) Delete(ctx context.Context, basename string) error {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&g.settings.SV))
defer cancel()
return g.bucket.Object(path.Join(g.prefix, basename)).Delete(ctx)
return contextutil.RunWithTimeout(ctx, "delete gcs file",
timeoutSetting.Get(&g.settings.SV),
func(ctx context.Context) error {
return g.bucket.Object(path.Join(g.prefix, basename)).Delete(ctx)
})
}

func (g *gcsStorage) Size(ctx context.Context, basename string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&g.settings.SV))
defer cancel()
r, err := g.bucket.Object(path.Join(g.prefix, basename)).NewReader(ctx)
if err != nil {
var r *gcs.Reader
if err := contextutil.RunWithTimeout(ctx, "size gcs file",
timeoutSetting.Get(&g.settings.SV),
func(ctx context.Context) error {
var err error
r, err = g.bucket.Object(path.Join(g.prefix, basename)).NewReader(ctx)
return err
}); err != nil {
return 0, err
}
sz := r.Attrs.Size
Expand Down Expand Up @@ -855,10 +878,12 @@ func (s *azureStorage) Conf() roachpb.ExportStorage {
func (s *azureStorage) WriteFile(
ctx context.Context, basename string, content io.ReadSeeker,
) error {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&s.settings.SV))
defer cancel()
blob := s.getBlob(basename)
_, err := blob.Upload(ctx, content, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
err := contextutil.RunWithTimeout(ctx, "write azure file", timeoutSetting.Get(&s.settings.SV),
func(ctx context.Context) error {
blob := s.getBlob(basename)
_, err := blob.Upload(ctx, content, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
return err
})
return errors.Wrapf(err, "write file: %s", basename)
}

Expand All @@ -874,18 +899,24 @@ func (s *azureStorage) ReadFile(ctx context.Context, basename string) (io.ReadCl
}

func (s *azureStorage) Delete(ctx context.Context, basename string) error {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&s.settings.SV))
defer cancel()
blob := s.getBlob(basename)
_, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
err := contextutil.RunWithTimeout(ctx, "delete azure file", timeoutSetting.Get(&s.settings.SV),
func(ctx context.Context) error {
blob := s.getBlob(basename)
_, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
return err
})
return errors.Wrap(err, "delete file")
}

func (s *azureStorage) Size(ctx context.Context, basename string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, timeoutSetting.Get(&s.settings.SV))
defer cancel()
blob := s.getBlob(basename)
props, err := blob.GetProperties(ctx, azblob.BlobAccessConditions{})
var props *azblob.BlobGetPropertiesResponse
err := contextutil.RunWithTimeout(ctx, "size azure file", timeoutSetting.Get(&s.settings.SV),
func(ctx context.Context) error {
blob := s.getBlob(basename)
var err error
props, err = blob.GetProperties(ctx, azblob.BlobAccessConditions{})
return err
})
if err != nil {
return 0, errors.Wrap(err, "get file properties")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2305,6 +2305,7 @@ writing ` + os.DevNull + `
debug/rangelog
debug/liveness
debug/settings
debug/reports/problemranges
debug/gossip/liveness
debug/gossip/network
debug/gossip/nodes
Expand Down Expand Up @@ -2335,7 +2336,6 @@ writing ` + os.DevNull + `
debug/nodes/1/ranges/18
debug/nodes/1/ranges/19
debug/nodes/1/ranges/20
debug/reports/problemranges
debug/schema/defaultdb@details
debug/schema/postgres@details
debug/schema/system@details
Expand Down
48 changes: 24 additions & 24 deletions pkg/cli/debug_synctest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/pkg/errors"
Expand Down Expand Up @@ -80,34 +81,33 @@ func (sn scriptNemesis) Off() error {
func runDebugSyncTest(cmd *cobra.Command, args []string) error {
// TODO(tschottdorf): make this a flag.
duration := 10 * time.Minute
return contextutil.RunWithTimeout(context.Background(), "synctest", duration, func(ctx context.Context) error {

ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

nem := scriptNemesis(args[1])
if err := nem.Off(); err != nil {
return errors.Wrap(err, "unable to disable nemesis at beginning of run")
}

var generation int
var lastSeq int64
for {
dir := filepath.Join(args[0], strconv.Itoa(generation))
curLastSeq, err := runSyncer(ctx, dir, lastSeq, nem)
if err != nil {
return err
nem := scriptNemesis(args[1])
if err := nem.Off(); err != nil {
return errors.Wrap(err, "unable to disable nemesis at beginning of run")
}
lastSeq = curLastSeq
if curLastSeq == 0 {
if ctx.Err() != nil {
// Clean shutdown.
return nil

var generation int
var lastSeq int64
for {
dir := filepath.Join(args[0], strconv.Itoa(generation))
curLastSeq, err := runSyncer(ctx, dir, lastSeq, nem)
if err != nil {
return err
}
lastSeq = curLastSeq
if curLastSeq == 0 {
if ctx.Err() != nil {
// Clean shutdown.
return nil
}
// RocksDB dir got corrupted.
generation++
continue
}
// RocksDB dir got corrupted.
generation++
continue
}
}
})
}

type nemesisI interface {
Expand Down
Loading

0 comments on commit 447fe7d

Please sign in to comment.