diff --git a/storage/acl.go b/storage/acl.go index c59c380ee2a5..0c2374008bd5 100644 --- a/storage/acl.go +++ b/storage/acl.go @@ -121,12 +121,12 @@ func (a *ACLHandle) List(ctx context.Context) (rules []ACLRule, err error) { func (a *ACLHandle) bucketDefaultList(ctx context.Context) ([]ACLRule, error) { var acls *raw.ObjectAccessControls var err error + req := a.c.raw.DefaultObjectAccessControls.List(a.bucket) + a.configureCall(ctx, req) err = run(ctx, func() error { - req := a.c.raw.DefaultObjectAccessControls.List(a.bucket) - a.configureCall(ctx, req) acls, err = req.Do() return err - }, a.retry, true) + }, a.retry, true, setRetryHeaderHTTP(req)) if err != nil { return nil, err } @@ -139,18 +139,18 @@ func (a *ACLHandle) bucketDefaultDelete(ctx context.Context, entity ACLEntity) e return run(ctx, func() error { return req.Do() - }, a.retry, false) + }, a.retry, false, setRetryHeaderHTTP(req)) } func (a *ACLHandle) bucketList(ctx context.Context) ([]ACLRule, error) { var acls *raw.BucketAccessControls var err error + req := a.c.raw.BucketAccessControls.List(a.bucket) + a.configureCall(ctx, req) err = run(ctx, func() error { - req := a.c.raw.BucketAccessControls.List(a.bucket) - a.configureCall(ctx, req) acls, err = req.Do() return err - }, a.retry, true) + }, a.retry, true, setRetryHeaderHTTP(req)) if err != nil { return nil, err } @@ -168,7 +168,7 @@ func (a *ACLHandle) bucketSet(ctx context.Context, entity ACLEntity, role ACLRol return run(ctx, func() error { _, err := req.Do() return err - }, a.retry, false) + }, a.retry, false, setRetryHeaderHTTP(req)) } func (a *ACLHandle) bucketDelete(ctx context.Context, entity ACLEntity) error { @@ -176,18 +176,18 @@ func (a *ACLHandle) bucketDelete(ctx context.Context, entity ACLEntity) error { a.configureCall(ctx, req) return run(ctx, func() error { return req.Do() - }, a.retry, false) + }, a.retry, false, setRetryHeaderHTTP(req)) } func (a *ACLHandle) objectList(ctx context.Context) ([]ACLRule, error) { var acls *raw.ObjectAccessControls var err error + req := a.c.raw.ObjectAccessControls.List(a.bucket, a.object) + a.configureCall(ctx, req) err = run(ctx, func() error { - req := a.c.raw.ObjectAccessControls.List(a.bucket, a.object) - a.configureCall(ctx, req) acls, err = req.Do() return err - }, a.retry, true) + }, a.retry, true, setRetryHeaderHTTP(req)) if err != nil { return nil, err } @@ -215,7 +215,7 @@ func (a *ACLHandle) objectSet(ctx context.Context, entity ACLEntity, role ACLRol return run(ctx, func() error { _, err := req.Do() return err - }, a.retry, false) + }, a.retry, false, setRetryHeaderHTTP(req)) } func (a *ACLHandle) objectDelete(ctx context.Context, entity ACLEntity) error { @@ -223,7 +223,7 @@ func (a *ACLHandle) objectDelete(ctx context.Context, entity ACLEntity) error { a.configureCall(ctx, req) return run(ctx, func() error { return req.Do() - }, a.retry, false) + }, a.retry, false, setRetryHeaderHTTP(req)) } func (a *ACLHandle) configureCall(ctx context.Context, call interface{ Header() http.Header }) { diff --git a/storage/bucket.go b/storage/bucket.go index 3dfcce843999..101f695b5a6a 100644 --- a/storage/bucket.go +++ b/storage/bucket.go @@ -103,7 +103,7 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck if attrs != nil && attrs.PredefinedDefaultObjectACL != "" { req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL) } - return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true) + return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true, setRetryHeaderHTTP(req)) } // Delete deletes the Bucket. @@ -116,7 +116,7 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) { return err } - return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true) + return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true, setRetryHeaderHTTP(req)) } func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) { @@ -184,7 +184,7 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error err = run(ctx, func() error { resp, err = req.Context(ctx).Do() return err - }, b.retry, true) + }, b.retry, true, setRetryHeaderHTTP(req)) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrBucketNotExist @@ -232,7 +232,7 @@ func (b *BucketHandle) Update(ctx context.Context, uattrs BucketAttrsToUpdate) ( return err } - if err := run(ctx, call, b.retry, isIdempotent); err != nil { + if err := run(ctx, call, b.retry, isIdempotent, setRetryHeaderHTTP(req)); err != nil { return nil, err } return newBucket(rawBucket) @@ -1341,7 +1341,7 @@ func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error { return run(ctx, func() error { _, err := req.Context(ctx).Do() return err - }, b.retry, true) + }, b.retry, true, setRetryHeaderHTTP(req)) } // applyBucketConds modifies the provided call using the conditions in conds. @@ -1999,7 +1999,7 @@ func (it *ObjectIterator) fetch(pageSize int, pageToken string) (string, error) err = run(it.ctx, func() error { resp, err = req.Context(it.ctx).Do() return err - }, it.bucket.retry, true) + }, it.bucket.retry, true, setRetryHeaderHTTP(req)) if err != nil { var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { @@ -2086,7 +2086,7 @@ func (it *BucketIterator) fetch(pageSize int, pageToken string) (token string, e err = run(it.ctx, func() error { resp, err = req.Context(it.ctx).Do() return err - }, it.client.retry, true) + }, it.client.retry, true, setRetryHeaderHTTP(req)) if err != nil { return "", err } diff --git a/storage/copy.go b/storage/copy.go index f180753520c6..26865fa47148 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -142,7 +142,7 @@ func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.Rewr retryCall := func() error { res, err = call.Do(); return err } isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) - if err := run(ctx, retryCall, c.dst.retry, isIdempotent); err != nil { + if err := run(ctx, retryCall, c.dst.retry, isIdempotent, setRetryHeaderHTTP(call)); err != nil { return nil, err } c.RewriteToken = res.RewriteToken @@ -237,7 +237,7 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { retryCall := func() error { obj, err = call.Do(); return err } isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) - if err := run(ctx, retryCall, c.dst.retry, isIdempotent); err != nil { + if err := run(ctx, retryCall, c.dst.retry, isIdempotent, setRetryHeaderHTTP(call)); err != nil { return nil, err } return newObject(obj), nil diff --git a/storage/go.mod b/storage/go.mod index 43e9fdfd5d30..59daffe3db01 100644 --- a/storage/go.mod +++ b/storage/go.mod @@ -8,6 +8,7 @@ require ( cloud.google.com/go/iam v0.3.0 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.7 + github.com/google/uuid v1.1.2 github.com/googleapis/gax-go/v2 v2.3.0 github.com/googleapis/go-type-adapters v1.0.0 golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 diff --git a/storage/go.sum b/storage/go.sum index 891a63287f0d..433427d1642a 100644 --- a/storage/go.sum +++ b/storage/go.sum @@ -157,6 +157,7 @@ github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 7ef24c8cdcec..2efde67151ae 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -122,7 +122,7 @@ func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project strin var err error resp, err = c.raw.GetServiceAccount(ctx, req, s.gax...) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { return "", err } @@ -154,7 +154,7 @@ func (c *grpcStorageClient) CreateBucket(ctx context.Context, project string, at battrs = newBucketFromProto(res) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) return battrs, err } @@ -187,7 +187,7 @@ func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opt err = run(it.ctx, func() error { buckets, next, err = gitr.InternalFetch(pageSize, pageToken) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { return "", err } @@ -223,7 +223,7 @@ func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, con return run(ctx, func() error { return c.raw.DeleteBucket(ctx, req, s.gax...) - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) } func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { @@ -245,7 +245,7 @@ func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds battrs = newBucketFromProto(res) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { return nil, ErrBucketNotExist @@ -333,7 +333,7 @@ func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uat res, err := c.raw.UpdateBucket(ctx, req, s.gax...) battrs = newBucketFromProto(res) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) return battrs, err } @@ -354,7 +354,7 @@ func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucke return run(ctx, func() error { _, err := c.raw.LockBucketRetentionPolicy(ctx, req, s.gax...) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) } func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator { @@ -384,7 +384,7 @@ func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Q err = run(it.ctx, func() error { objects, token, err = gitr.InternalFetch(pageSize, pageToken) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { err = ErrBucketNotExist @@ -547,7 +547,7 @@ func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, v var err error rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) return rp, err } @@ -564,7 +564,7 @@ func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, p return run(ctx, func() error { _, err := c.raw.SetIamPolicy(ctx, req, s.gax...) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) } func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) { @@ -579,7 +579,7 @@ func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource str var err error res, err = c.raw.TestIamPermissions(ctx, req, s.gax...) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) if err != nil { return nil, err } diff --git a/storage/hmac.go b/storage/hmac.go index 6f834d4e9edd..2de721e81544 100644 --- a/storage/hmac.go +++ b/storage/hmac.go @@ -130,7 +130,7 @@ func (hkh *HMACKeyHandle) Get(ctx context.Context, opts ...HMACKeyOption) (*HMAC err = run(ctx, func() error { metadata, err = call.Context(ctx).Do() return err - }, hkh.retry, true) + }, hkh.retry, true, setRetryHeaderHTTP(call)) if err != nil { return nil, err } @@ -159,7 +159,7 @@ func (hkh *HMACKeyHandle) Delete(ctx context.Context, opts ...HMACKeyOption) err return run(ctx, func() error { return delCall.Context(ctx).Do() - }, hkh.retry, true) + }, hkh.retry, true, setRetryHeaderHTTP(delCall)) } func pbHmacKeyToHMACKey(pb *raw.HmacKey, updatedTimeCanBeNil bool) (*HMACKey, error) { @@ -221,7 +221,7 @@ func (c *Client) CreateHMACKey(ctx context.Context, projectID, serviceAccountEma h, err := call.Context(ctx).Do() hkPb = h return err - }, c.retry, false); err != nil { + }, c.retry, false, setRetryHeaderHTTP(call)); err != nil { return nil, err } @@ -267,7 +267,7 @@ func (h *HMACKeyHandle) Update(ctx context.Context, au HMACKeyAttrsToUpdate, opt err = run(ctx, func() error { metadata, err = call.Context(ctx).Do() return err - }, h.retry, isIdempotent) + }, h.retry, isIdempotent, setRetryHeaderHTTP(call)) if err != nil { return nil, err @@ -373,7 +373,7 @@ func (it *HMACKeysIterator) fetch(pageSize int, pageToken string) (token string, err = run(it.ctx, func() error { resp, err = call.Context(ctx).Do() return err - }, it.retry, true) + }, it.retry, true, setRetryHeaderHTTP(call)) if err != nil { return "", err } diff --git a/storage/http_client.go b/storage/http_client.go index 7fd5069babaf..1ec55150f500 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -145,7 +145,7 @@ func (c *httpStorageClient) GetServiceAccount(ctx context.Context, project strin var err error res, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) if err != nil { return "", err } @@ -182,7 +182,7 @@ func (c *httpStorageClient) CreateBucket(ctx context.Context, project string, at } battrs, err = newBucket(b) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) return battrs, err } @@ -206,7 +206,7 @@ func (c *httpStorageClient) ListBuckets(ctx context.Context, project string, opt err = run(it.ctx, func() error { resp, err = req.Context(it.ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { return "", err } @@ -241,7 +241,7 @@ func (c *httpStorageClient) DeleteBucket(ctx context.Context, bucket string, con req.UserProject(s.userProject) } - return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent) + return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) { @@ -260,7 +260,7 @@ func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds err = run(ctx, func() error { resp, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { @@ -294,7 +294,7 @@ func (c *httpStorageClient) UpdateBucket(ctx context.Context, bucket string, uat err = run(ctx, func() error { rawBucket, err = req.Context(ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { return nil, err } @@ -313,7 +313,7 @@ func (c *httpStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucke return run(ctx, func() error { _, err := req.Context(ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator { s := callSettings(c.settings, opts...) @@ -352,7 +352,7 @@ func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Q err = run(it.ctx, func() error { resp, err = req.Context(it.ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { var e *googleapi.Error if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { @@ -394,19 +394,19 @@ func (c *httpStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket s s := callSettings(c.settings, opts...) req := c.raw.DefaultObjectAccessControls.Delete(bucket, string(entity)) configureACLCall(ctx, s.userProject, req) - return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent) + return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { s := callSettings(c.settings, opts...) var acls *raw.ObjectAccessControls var err error + req := c.raw.DefaultObjectAccessControls.List(bucket) + configureACLCall(ctx, s.userProject, req) err = run(ctx, func() error { - req := c.raw.DefaultObjectAccessControls.List(bucket) - configureACLCall(ctx, s.userProject, req) acls, err = req.Do() return err - }, s.retry, true) + }, s.retry, true, setRetryHeaderHTTP(req)) if err != nil { return nil, err } @@ -422,19 +422,19 @@ func (c *httpStorageClient) DeleteBucketACL(ctx context.Context, bucket string, s := callSettings(c.settings, opts...) req := c.raw.BucketAccessControls.Delete(bucket, string(entity)) configureACLCall(ctx, s.userProject, req) - return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent) + return run(ctx, func() error { return req.Context(ctx).Do() }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) } func (c *httpStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) { s := callSettings(c.settings, opts...) var acls *raw.BucketAccessControls var err error + req := c.raw.BucketAccessControls.List(bucket) + configureACLCall(ctx, s.userProject, req) err = run(ctx, func() error { - req := c.raw.BucketAccessControls.List(bucket) - configureACLCall(ctx, s.userProject, req) acls, err = req.Do() return err - }, s.retry, true) + }, s.retry, true, setRetryHeaderHTTP(req)) if err != nil { return nil, err } @@ -448,15 +448,15 @@ func (c *httpStorageClient) UpdateBucketACL(ctx context.Context, bucket string, Entity: string(entity), Role: string(role), } + req := c.raw.BucketAccessControls.Update(bucket, string(entity), acl) + configureACLCall(ctx, s.userProject, req) var aclRule ACLRule var err error err = run(ctx, func() error { - req := c.raw.BucketAccessControls.Update(bucket, string(entity), acl) - configureACLCall(ctx, s.userProject, req) acl, err = req.Do() aclRule = toBucketACLRule(acl) return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(req)) if err != nil { return nil, err } @@ -516,7 +516,7 @@ func (c *httpStorageClient) GetIamPolicy(ctx context.Context, resource string, v var err error rp, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) if err != nil { return nil, err } @@ -536,7 +536,7 @@ func (c *httpStorageClient) SetIamPolicy(ctx context.Context, resource string, p return run(ctx, func() error { _, err := call.Context(ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) } func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) { @@ -551,7 +551,7 @@ func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource str var err error res, err = call.Context(ctx).Do() return err - }, s.retry, s.idempotent) + }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) if err != nil { return nil, err } diff --git a/storage/iam.go b/storage/iam.go index a870ab8ee8e8..cf9f899a487b 100644 --- a/storage/iam.go +++ b/storage/iam.go @@ -57,7 +57,7 @@ func (c *iamClient) GetWithVersion(ctx context.Context, resource string, request err = run(ctx, func() error { rp, err = call.Context(ctx).Do() return err - }, c.retry, true) + }, c.retry, true, setRetryHeaderHTTP(call)) if err != nil { return nil, err } @@ -78,7 +78,7 @@ func (c *iamClient) Set(ctx context.Context, resource string, p *iampb.Policy) ( return run(ctx, func() error { _, err := call.Context(ctx).Do() return err - }, c.retry, isIdempotent) + }, c.retry, isIdempotent, setRetryHeaderHTTP(call)) } func (c *iamClient) Test(ctx context.Context, resource string, perms []string) (permissions []string, err error) { @@ -94,7 +94,7 @@ func (c *iamClient) Test(ctx context.Context, resource string, perms []string) ( err = run(ctx, func() error { res, err = call.Context(ctx).Do() return err - }, c.retry, true) + }, c.retry, true, setRetryHeaderHTTP(call)) if err != nil { return nil, err } diff --git a/storage/invoke.go b/storage/invoke.go index 98febedacf0f..d0f0dd8d1718 100644 --- a/storage/invoke.go +++ b/storage/invoke.go @@ -17,12 +17,17 @@ package storage import ( "context" "errors" + "fmt" "io" "net" + "net/http" "net/url" "strings" "cloud.google.com/go/internal" + "cloud.google.com/go/internal/version" + sinternal "cloud.google.com/go/storage/internal" + "github.com/google/uuid" gax "github.com/googleapis/gax-go/v2" "google.golang.org/api/googleapi" "google.golang.org/grpc/codes" @@ -30,15 +35,20 @@ import ( ) var defaultRetry *retryConfig = &retryConfig{} +var xGoogDefaultHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), sinternal.Version) // run determines whether a retry is necessary based on the config and // idempotency information. It then calls the function with or without retries // as appropriate, using the configured settings. -func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool) error { +func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool, setHeader func(string, int)) error { + attempts := 1 + invocationID := uuid.New().String() + if retry == nil { retry = defaultRetry } if (retry.policy == RetryIdempotent && !isIdempotent) || retry.policy == RetryNever { + setHeader(invocationID, attempts) return call() } bo := gax.Backoff{} @@ -51,12 +61,34 @@ func run(ctx context.Context, call func() error, retry *retryConfig, isIdempoten if retry.shouldRetry != nil { errorFunc = retry.shouldRetry } + return internal.Retry(ctx, bo, func() (stop bool, err error) { + setHeader(invocationID, attempts) err = call() + attempts++ return !errorFunc(err), err }) } +func setRetryHeaderHTTP(req interface{ Header() http.Header }) func(string, int) { + return func(invocationID string, attempts int) { + if req == nil { + return + } + header := req.Header() + invocationHeader := fmt.Sprintf("gccl-invocation-id/%v gccl-attempt-count/%v", invocationID, attempts) + xGoogHeader := strings.Join([]string{invocationHeader, xGoogDefaultHeader}, " ") + header.Set("x-goog-api-client", xGoogHeader) + } +} + +// TODO: Implement method setting header via context for gRPC +func setRetryHeaderGRPC(_ context.Context) func(string, int) { + return func(_ string, _ int) { + return + } +} + func shouldRetry(err error) bool { if err == nil { return false diff --git a/storage/invoke_test.go b/storage/invoke_test.go index 50aa43527a73..02139228592c 100644 --- a/storage/invoke_test.go +++ b/storage/invoke_test.go @@ -17,9 +17,13 @@ package storage import ( "context" "errors" + "fmt" "io" "net" + "net/http" "net/url" + "regexp" + "strings" "testing" "golang.org/x/xerrors" @@ -164,23 +168,53 @@ func TestInvoke(t *testing.T) { } { t.Run(test.desc, func(s *testing.T) { counter := 0 + req := &fakeApiaryRequest{header: http.Header{}} + var initialHeader string call := func() error { + if counter == 0 { + initialHeader = req.Header()["X-Goog-Api-Client"][0] + } counter++ if counter <= test.count { return test.initialErr } return test.finalErr } - got := run(ctx, call, test.retry, test.isIdempotentValue) + got := run(ctx, call, test.retry, test.isIdempotentValue, setRetryHeaderHTTP(req)) if test.expectFinalErr && got != test.finalErr { s.Errorf("got %v, want %v", got, test.finalErr) } else if !test.expectFinalErr && got != test.initialErr { s.Errorf("got %v, want %v", got, test.initialErr) } + gotHeader := req.Header()["X-Goog-Api-Client"][0] + wantAttempts := 1 + test.count + if !test.expectFinalErr { + wantAttempts = 1 + } + wantHeader := strings.ReplaceAll(initialHeader, "gccl-attempt-count/1", fmt.Sprintf("gccl-attempt-count/%v", wantAttempts)) + if gotHeader != wantHeader { + t.Errorf("case %q, retry header:\ngot %v\nwant %v", test.desc, gotHeader, wantHeader) + } + wantHeaderFormat := "gccl-invocation-id/.{36} gccl-attempt-count/[0-9]+ gl-go/.* gccl/" + match, err := regexp.MatchString(wantHeaderFormat, gotHeader) + if err != nil { + s.Fatalf("compiling regexp: %v", err) + } + if !match { + s.Errorf("X-Goog-Api-Client header has wrong format\ngot %v\nwant regex matching %v", gotHeader, wantHeaderFormat) + } }) } } +type fakeApiaryRequest struct { + header http.Header +} + +func (f *fakeApiaryRequest) Header() http.Header { + return f.header +} + func TestShouldRetry(t *testing.T) { t.Parallel() diff --git a/storage/notifications.go b/storage/notifications.go index 333664f92c33..dd43822b6a34 100644 --- a/storage/notifications.go +++ b/storage/notifications.go @@ -142,7 +142,7 @@ func (b *BucketHandle) AddNotification(ctx context.Context, n *Notification) (re err = run(ctx, func() error { rn, err = call.Context(ctx).Do() return err - }, b.retry, false) + }, b.retry, false, setRetryHeaderHTTP(call)) if err != nil { return nil, err } @@ -164,7 +164,7 @@ func (b *BucketHandle) Notifications(ctx context.Context) (n map[string]*Notific err = run(ctx, func() error { res, err = call.Context(ctx).Do() return err - }, b.retry, true) + }, b.retry, true, setRetryHeaderHTTP(call)) if err != nil { return nil, err } @@ -191,5 +191,5 @@ func (b *BucketHandle) DeleteNotification(ctx context.Context, id string) (err e } return run(ctx, func() error { return call.Context(ctx).Do() - }, b.retry, true) + }, b.retry, true, setRetryHeaderHTTP(call)) } diff --git a/storage/reader.go b/storage/reader.go index 86153256c380..1667011c96bb 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -210,7 +210,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) gen = gen64 } return nil - }, o.retry, true) + }, o.retry, true, setRetryHeaderHTTP(nil)) if err != nil { return nil, err } @@ -492,7 +492,7 @@ func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, lengt msg, err = stream.Recv() return err - }, o.retry, true) + }, o.retry, true, setRetryHeaderHTTP(nil)) if err != nil { // Close the stream context we just created to ensure we don't leak // resources. diff --git a/storage/storage.go b/storage/storage.go index 841d405ac708..719f39723bce 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -39,7 +39,6 @@ import ( "cloud.google.com/go/internal/optional" "cloud.google.com/go/internal/trace" - "cloud.google.com/go/internal/version" "cloud.google.com/go/storage/internal" gapic "cloud.google.com/go/storage/internal/apiv2" "github.com/googleapis/gax-go/v2" @@ -87,10 +86,9 @@ const ( ScopeReadWrite = raw.DevstorageReadWriteScope ) -var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), internal.Version) - +// TODO: remove this once header with invocation ID is applied to all methods. func setClientHeader(headers http.Header) { - headers.Set("x-goog-api-client", xGoogHeader) + headers.Set("x-goog-api-client", xGoogDefaultHeader) } // Client is a client for interacting with Google Cloud Storage. @@ -916,7 +914,7 @@ func (o *ObjectHandle) Attrs(ctx context.Context) (attrs *ObjectAttrs, err error } var obj *raw.Object setClientHeader(call.Header()) - err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true) + err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true, setRetryHeaderHTTP(call)) var e *googleapi.Error if errors.As(err, &e) && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist @@ -1021,7 +1019,7 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) ( if o.conds != nil && o.conds.MetagenerationMatch != 0 { isIdempotent = true } - err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, isIdempotent) + err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, isIdempotent, setRetryHeaderHTTP(call)) var e *googleapi.Error if errors.As(err, &e) && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist @@ -1091,7 +1089,7 @@ func (o *ObjectHandle) Delete(ctx context.Context) error { if (o.conds != nil && o.conds.GenerationMatch != 0) || o.gen >= 0 { isIdempotent = true } - err := run(ctx, func() error { return call.Do() }, o.retry, isIdempotent) + err := run(ctx, func() error { return call.Do() }, o.retry, isIdempotent, setRetryHeaderHTTP(call)) var e *googleapi.Error if errors.As(err, &e) && e.Code == http.StatusNotFound { return ErrObjectNotExist @@ -2009,7 +2007,7 @@ func (c *Client) ServiceAccount(ctx context.Context, projectID string) (string, err = run(ctx, func() error { res, err = r.Context(ctx).Do() return err - }, c.retry, true) + }, c.retry, true, setRetryHeaderHTTP(r)) if err != nil { return "", err }