From 16b2b83be638b18c1e3134d01c63251cb836b333 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 6 Nov 2018 13:50:20 +1100 Subject: [PATCH] [CONNECTOPS-97] Pass in a bespoke HTTP transport to GCS client This will let us experiment with things around the state of the HTTP/2 connection that storage uses, and hopefully track down the cause of the REFUSED_STREAM errors we are seeing --- physical/gcs/gcs.go | 97 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 4 deletions(-) diff --git a/physical/gcs/gcs.go b/physical/gcs/gcs.go index 3138aa5ddf01..61009a1fc120 100644 --- a/physical/gcs/gcs.go +++ b/physical/gcs/gcs.go @@ -2,9 +2,13 @@ package gcs import ( "context" + "crypto/tls" "errors" "fmt" + "io" "io/ioutil" + "net" + "net/http" "os" "sort" "strconv" @@ -19,6 +23,9 @@ import ( "cloud.google.com/go/storage" "github.com/armon/go-metrics" + "golang.org/x/net/http2" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" "google.golang.org/api/iterator" "google.golang.org/api/option" ) @@ -79,6 +86,8 @@ type Backend struct { // logger and permitPool are internal constructs logger log.Logger permitPool *physical.PermitPool + + keyLogger io.WriteCloser } // NewBackend constructs a Google Cloud Storage backend with the given @@ -141,21 +150,77 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error ) logger.Debug("creating client") + ctx := context.Background() + dialer := net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: false, + } + tlsConfig := &tls.Config{} + + var keyLogger io.WriteCloser + didReturnKeyLogger := false + defer func() { + if keyLogger != nil && !didReturnKeyLogger { + keyLogger.Close() + } + }() + if keylogFile, ok := os.LookupEnv("SSLKEYLOGFILE"); ok { + keyLogger, err = os.OpenFile(keylogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + return nil, errwrap.Wrapf("failed to open SSLKEYLOGFILE: {{err}}", err) + } + tlsConfig.KeyLogWriter = keyLogger + } + + httpTransport := &http.Transport{ + TLSClientConfig: tlsConfig, + Proxy: http.ProxyFromEnvironment, + DialContext: dialer.DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + if err := http2.ConfigureTransport(httpTransport); err != nil { + return nil, errwrap.Wrapf("failed to configure http2: {{err}}", err) + } + uaTransportWrapper := &userAgentTransport{ + base: httpTransport, + userAgent: useragent.String(), + } + + tokenSource, err := google.DefaultTokenSource(ctx) + // Client - opts := []option.ClientOption{option.WithUserAgent(useragent.String())} if credentialsFile := c["credentials_file"]; credentialsFile != "" { logger.Warn("specifying credentials_file as an option is " + "deprecated. Please use the GOOGLE_APPLICATION_CREDENTIALS environment " + "variable or instance credentials instead.") - opts = append(opts, option.WithServiceAccountFile(credentialsFile)) + jsonKeyBytes, err := ioutil.ReadFile(credentialsFile) + if err != nil { + return nil, errwrap.Wrapf("failed to read credentials file: {{err}}", err) + } + tokenSource, err = google.JWTAccessTokenSourceFromJSON(jsonKeyBytes, storage.ScopeFullControl) + if err != nil { + return nil, errwrap.Wrapf("failed to construct credentials token source: {{err}}", err) + } } - ctx := context.Background() - client, err := storage.NewClient(ctx, opts...) + oauth2Transport := &oauth2.Transport{ + Source: tokenSource, + Base: uaTransportWrapper, + } + httpClient := &http.Client{ + Transport: oauth2Transport, + } + + client, err := storage.NewClient(ctx, option.WithHTTPClient(httpClient)) if err != nil { return nil, errwrap.Wrapf("failed to create storage client: {{err}}", err) } + didReturnKeyLogger = true return &Backend{ bucket: bucket, haEnabled: haEnabled, @@ -163,6 +228,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error client: client, permitPool: physical.NewPermitPool(maxParallel), logger: logger, + keyLogger: keyLogger, }, nil } @@ -294,3 +360,26 @@ func extractInt(s string) (int, error) { } return strconv.Atoi(s) } + +type userAgentTransport struct { + userAgent string + base http.RoundTripper +} + +func (t userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) { + rt := t.base + if rt == nil { + return nil, errors.New("transport: no Transport specified") + } + if t.userAgent == "" { + return rt.RoundTrip(req) + } + newReq := *req + newReq.Header = make(http.Header) + for k, vv := range req.Header { + newReq.Header[k] = vv + } + // TODO(cbro): append to existing User-Agent header? + newReq.Header["User-Agent"] = []string{t.userAgent} + return rt.RoundTrip(&newReq) +}