Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry storage access on internal errors #180

Closed
wants to merge 12 commits into from
3 changes: 2 additions & 1 deletion elan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var opts = struct {
Parallelism int `long:"parallelism" default:"50" description:"Maximum number of in-flight parallel requests to the backend storage layer"`
DirCacheSize int64 `long:"dir_cache_size" default:"10240" description:"Number of directory entries to cache for GetTree"`
KnownBlobCacheSize flags.ByteSize `long:"known_blob_cache_size" description:"Max size of known blob cache (in approximate bytes)"`
StorageRetries int `long:"storage_retries" description:"Max number of retries when accessing storage" default:"5"`
Admin cli.AdminOpts `group:"Options controlling HTTP admin server" namespace:"admin"`
}{
Usage: `
Expand All @@ -34,5 +35,5 @@ modes are intended for testing only.
func main() {
_, info := cli.ParseFlagsOrDie("Elan", &opts, &opts.Logging)
go cli.ServeAdmin(opts.Admin, info)
rpc.ServeForever(opts.GRPC, opts.Storage, opts.Parallelism, opts.DirCacheSize, int64(opts.KnownBlobCacheSize))
rpc.ServeForever(opts.GRPC, opts.Storage, opts.StorageRetries, opts.Parallelism, opts.DirCacheSize, int64(opts.KnownBlobCacheSize))
}
2 changes: 1 addition & 1 deletion elan/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func New(url string, tls bool, tokenFile string) (Client, error) {
// We can't use url.Parse here because it tends to put too much into the scheme (e.g. example.org:8080 -> scheme:example.org)
if strings.Contains(url, "://") {
return &elanClient{
s: createServer(url, 8, 10240, 10*1024*1024),
s: createServer(url, 5, 8, 10240, 10*1024*1024),
timeout: 1 * time.Minute,
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion elan/rpc/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func testMain(m *testing.M) int {
lis, s := startServer(grpcutil.Opts{
Host: "127.0.0.1",
Port: 7777,
}, storage, 5, 1000, 1000)
}, storage, 5, 5, 1000, 1000)
go grpcutil.ServeForever(lis, s)
defer s.Stop()

Expand Down
90 changes: 90 additions & 0 deletions elan/rpc/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package rpc

import (
"context"
"fmt"
"io"

"gocloud.dev/blob"
"gocloud.dev/gcerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// retryBucket implements a retry logic on top of all bucket methods that can
// return an error. Those methods are retried only on internal errors that are
// not caused by a context cancellation.
type retryBucket struct {
bucket
retries int
}

func (b retryBucket) NewRangeReader(ctx context.Context, key string, offset, length int64, opts *blob.ReaderOptions) (io.ReadCloser, error) {
return retryWithReturnValue(ctx, b.retries, func() (io.ReadCloser, error) {
return b.bucket.NewRangeReader(ctx, key, offset, length, opts)
})
}

func (b retryBucket) NewWriter(ctx context.Context, key string, opts *blob.WriterOptions) (io.WriteCloser, error) {
return retryWithReturnValue(ctx, b.retries, func() (io.WriteCloser, error) {
return b.bucket.NewWriter(ctx, key, opts)
})
}

func (b retryBucket) WriteAll(ctx context.Context, key string, data []byte) error {
return retry(ctx, b.retries, func() error {
return b.bucket.WriteAll(ctx, key, data)
})
}

func (b retryBucket) ReadAll(ctx context.Context, key string) ([]byte, error) {
return retryWithReturnValue(ctx, b.retries, func() ([]byte, error) {
return b.bucket.ReadAll(ctx, key)
})
}

func (b retryBucket) Exists(ctx context.Context, key string) (bool, error) {
return retryWithReturnValue(ctx, b.retries, func() (bool, error) {
return b.bucket.Exists(ctx, key)
})
}

func (b retryBucket) Delete(ctx context.Context, key string, hard bool) error {
return retry(ctx, b.retries, func() error {
return b.bucket.Delete(ctx, key, hard)
})
}

func retryWithReturnValue[V any](ctx context.Context, retries int, f func() (V, error)) (V, error) {
var (
ret V
err error
)
err = retry(ctx, retries, func() error {
ret, err = f()
return err
})
return ret, err
}

func retry(ctx context.Context, retries int, f func() error) error {
var (
err = f()
r int
)
for r = 0; r < retries && err != nil && ctx.Err() == nil && isErrorRetryable(err); r++ {
err = f()
}
if err != nil && r > 0 {
return fmt.Errorf("%w (retries: %d)", err, r)
}
return err
}

func isErrorRetryable(err error) bool {
return (gcerrors.Code(err) == gcerrors.Internal || status.Code(err) == codes.Internal) ||
(gcerrors.Code(err) == gcerrors.Canceled || status.Code(err) == codes.Canceled) ||
(gcerrors.Code(err) == gcerrors.DeadlineExceeded || status.Code(err) == codes.DeadlineExceeded) ||
(gcerrors.Code(err) == gcerrors.ResourceExhausted || status.Code(err) == codes.ResourceExhausted) ||
(gcerrors.Code(err) == gcerrors.Unknown || status.Code(err) == codes.Unknown)
}
12 changes: 6 additions & 6 deletions elan/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ func init() {
}

// ServeForever serves on the given port until terminated.
func ServeForever(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) {
lis, s := startServer(opts, storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize)
func ServeForever(opts grpcutil.Opts, storage string, storageRetries, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) {
lis, s := startServer(opts, storage, storageRetries, parallelism, maxDirCacheSize, maxKnownBlobCacheSize)
grpcutil.ServeForever(lis, s)
}

func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) *server {
func createServer(storage string, storageRetries, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) *server {
dec, _ := zstd.NewReader(nil)
enc, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
return &server{
bytestreamRe: regexp.MustCompile("(?:uploads/[0-9a-f-]+/)?(blobs|compressed-blobs/zstd)/([0-9a-f]+)/([0-9]+)"),
storageRoot: strings.TrimPrefix(storage, "file://"),
isFileStorage: strings.HasPrefix(storage, "file://"),
bucket: mustOpenStorage(storage),
bucket: mustOpenStorage(storage, storageRetries),
limiter: make(chan struct{}, parallelism),
dirCache: mustCache(maxDirCacheSize),
knownBlobCache: mustCache(maxKnownBlobCacheSize),
Expand All @@ -163,8 +163,8 @@ func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlob
}
}

func startServer(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) (net.Listener, *grpc.Server) {
srv := createServer(storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize)
func startServer(opts grpcutil.Opts, storage string, storageRetries, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) (net.Listener, *grpc.Server) {
srv := createServer(storage, storageRetries, parallelism, maxDirCacheSize, maxKnownBlobCacheSize)
lis, s := grpcutil.NewServer(opts)
pb.RegisterCapabilitiesServer(s, srv)
pb.RegisterActionCacheServer(s, srv)
Expand Down
8 changes: 6 additions & 2 deletions elan/rpc/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ type bucket interface {
Delete(ctx context.Context, key string, hard bool) error
}

func mustOpenStorage(url string) bucket {
func mustOpenStorage(url string, retries int) bucket {
bucket, err := blob.OpenBucket(context.Background(), url)
if err != nil {
log.Fatalf("Failed to open storage %s: %v", url, err)
}
return &adapter{bucket: bucket}
a := &adapter{bucket: bucket}
if retries > 0 {
return &retryBucket{bucket: a, retries: retries}
}
return a
}

type adapter struct {
Expand Down