From 2eef6978d75958999df6c8d7c980df6212e5a241 Mon Sep 17 00:00:00 2001 From: BrennaEpp Date: Thu, 6 Jul 2023 12:41:36 -0700 Subject: [PATCH 1/3] chore(storage): add GCSFuse client config [benchmarking] --- storage/internal/benchmarks/client_pool.go | 32 ++++++++++++++++------ storage/internal/benchmarks/main.go | 4 +++ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/storage/internal/benchmarks/client_pool.go b/storage/internal/benchmarks/client_pool.go index dc504907f1d2..f6161a15a94b 100644 --- a/storage/internal/benchmarks/client_pool.go +++ b/storage/internal/benchmarks/client_pool.go @@ -16,6 +16,7 @@ package main import ( "context" + "crypto/tls" "log" "net/http" "os" @@ -87,7 +88,7 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { nonBenchmarkingClients, closeNonBenchmarking = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, useDefault, useDefault, false) + return initializeHTTPClient(ctx, useDefault, useDefault, false, opts.gcsFuse) }, 1, ) @@ -96,7 +97,7 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == xmlAPI { xmlClients, closeXML = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, false) + return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, false, opts.gcsFuse) }, opts.numClients, ) @@ -108,7 +109,7 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == jsonAPI || opts.api == xmlAPI { jsonClients, closeJSON = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, true) + return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, true, opts.gcsFuse) }, opts.numClients, ) @@ -156,20 +157,33 @@ func getClient(ctx context.Context, api benchmarkAPI) *storage.Client { // mutex on starting a client so that we can set an env variable for GRPC clients var clientMu sync.Mutex -func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, json bool) (*storage.Client, error) { +func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, json bool, gcsFuse bool) (*storage.Client, error) { opts := []option.ClientOption{} - if writeBufferSize != useDefault || readBufferSize != useDefault { + if writeBufferSize != useDefault || readBufferSize != useDefault || gcsFuse { // We need to modify the underlying HTTP client - base := http.DefaultTransport.(*http.Transport).Clone() + + if gcsFuse { + base = &http.Transport{ + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + // This disables HTTP/2 in transport. + TLSNextProto: make( + map[string]func(string, *tls.Conn) http.RoundTripper, + ), + } + } + base.MaxIdleConnsPerHost = 100 // this is set in Storage as well base.WriteBufferSize = writeBufferSize base.ReadBufferSize = readBufferSize - http2Trans, err := http2.ConfigureTransports(base) - if err == nil { - http2Trans.ReadIdleTimeout = time.Second * 31 + if !gcsFuse { + http2Trans, err := http2.ConfigureTransports(base) + if err == nil { + http2Trans.ReadIdleTimeout = time.Second * 31 + } } trans, err := htransport.NewTransport(ctx, base, diff --git a/storage/internal/benchmarks/main.go b/storage/internal/benchmarks/main.go index 649853ac9d8b..078b61b7371a 100644 --- a/storage/internal/benchmarks/main.go +++ b/storage/internal/benchmarks/main.go @@ -80,6 +80,8 @@ type benchmarkOptions struct { numClients int workload int numObjectsPerDirectory int + + gcsFuse bool } func (b *benchmarkOptions) validate() error { @@ -171,6 +173,8 @@ func parseFlags() { flag.IntVar(&opts.workload, "workload", 1, "which workload to run") flag.IntVar(&opts.numObjectsPerDirectory, "directory_num_objects", 1000, "total number of objects in directory") + flag.BoolVar(&opts.gcsFuse, "gcs_fuse", false, "use GCSFuse configs on client creation") + flag.Parse() if len(projectID) < 1 { From da410cc3cfd25e506749661b7d2410bb53905675 Mon Sep 17 00:00:00 2001 From: BrennaEpp Date: Thu, 6 Jul 2023 13:51:20 -0700 Subject: [PATCH 2/3] add config struct for clients --- storage/internal/benchmarks/client_pool.go | 59 +++++++++++++++------- storage/internal/benchmarks/main.go | 4 +- 2 files changed, 44 insertions(+), 19 deletions(-) diff --git a/storage/internal/benchmarks/client_pool.go b/storage/internal/benchmarks/client_pool.go index f6161a15a94b..3dd9732b50b0 100644 --- a/storage/internal/benchmarks/client_pool.go +++ b/storage/internal/benchmarks/client_pool.go @@ -88,7 +88,10 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { nonBenchmarkingClients, closeNonBenchmarking = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, useDefault, useDefault, false, opts.gcsFuse) + return initializeHTTPClient(ctx, clientConfig{ + writeBufferSize: useDefault, + readBufferSize: useDefault, + }) }, 1, ) @@ -97,7 +100,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == xmlAPI { xmlClients, closeXML = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, false, opts.gcsFuse) + return initializeHTTPClient(ctx, clientConfig{ + writeBufferSize: opts.writeBufferSize, + readBufferSize: opts.readBufferSize, + useJSON: false, + setGCSFuseOpts: opts.gcsFuse, + }) }, opts.numClients, ) @@ -109,7 +117,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == jsonAPI || opts.api == xmlAPI { jsonClients, closeJSON = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, true, opts.gcsFuse) + return initializeHTTPClient(ctx, clientConfig{ + writeBufferSize: opts.writeBufferSize, + readBufferSize: opts.readBufferSize, + useJSON: true, + setGCSFuseOpts: opts.gcsFuse, + }) }, opts.numClients, ) @@ -119,7 +132,11 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == grpcAPI || opts.api == directPath { gRPCClients, closeGRPC = newClientPool( func() (*storage.Client, error) { - return initializeGRPCClient(context.Background(), opts.writeBufferSize, opts.readBufferSize, opts.connPoolSize) + return initializeGRPCClient(context.Background(), clientConfig{ + writeBufferSize: opts.writeBufferSize, + readBufferSize: opts.readBufferSize, + connectionPoolSize: opts.connPoolSize, + }) }, opts.numClients, ) @@ -157,14 +174,22 @@ func getClient(ctx context.Context, api benchmarkAPI) *storage.Client { // mutex on starting a client so that we can set an env variable for GRPC clients var clientMu sync.Mutex -func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, json bool, gcsFuse bool) (*storage.Client, error) { +// Client config +type clientConfig struct { + writeBufferSize, readBufferSize int + useJSON bool // only applicable to HTTP Clients + setGCSFuseOpts bool // only applicable to HTTP Clients + connectionPoolSize int // only applicable to GRPC Clients +} + +func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Client, error) { opts := []option.ClientOption{} - if writeBufferSize != useDefault || readBufferSize != useDefault || gcsFuse { + if config.writeBufferSize != useDefault || config.readBufferSize != useDefault || config.setGCSFuseOpts { // We need to modify the underlying HTTP client base := http.DefaultTransport.(*http.Transport).Clone() - if gcsFuse { + if config.setGCSFuseOpts { base = &http.Transport{ MaxConnsPerHost: 100, MaxIdleConnsPerHost: 100, @@ -176,10 +201,10 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i } base.MaxIdleConnsPerHost = 100 // this is set in Storage as well - base.WriteBufferSize = writeBufferSize - base.ReadBufferSize = readBufferSize + base.WriteBufferSize = config.writeBufferSize + base.ReadBufferSize = config.readBufferSize - if !gcsFuse { + if !config.setGCSFuseOpts { http2Trans, err := http2.ConfigureTransports(base) if err == nil { http2Trans.ReadIdleTimeout = time.Second * 31 @@ -195,7 +220,7 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i opts = append(opts, option.WithHTTPClient(&http.Client{Transport: trans})) } - if json { + if config.useJSON { opts = append(opts, storage.WithJSONReads()) } @@ -207,14 +232,14 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i return client, err } -func initializeGRPCClient(ctx context.Context, writeBufferSize, readBufferSize int, connectionPoolSize int) (*storage.Client, error) { - opts := []option.ClientOption{option.WithGRPCConnectionPool(connectionPoolSize)} +func initializeGRPCClient(ctx context.Context, config clientConfig) (*storage.Client, error) { + opts := []option.ClientOption{option.WithGRPCConnectionPool(config.connectionPoolSize)} - if writeBufferSize != useDefault { - opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(writeBufferSize))) + if config.writeBufferSize != useDefault { + opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(config.writeBufferSize))) } - if readBufferSize != useDefault { - opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(readBufferSize))) + if config.readBufferSize != useDefault { + opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(config.readBufferSize))) } clientMu.Lock() diff --git a/storage/internal/benchmarks/main.go b/storage/internal/benchmarks/main.go index 078b61b7371a..cd864213276e 100644 --- a/storage/internal/benchmarks/main.go +++ b/storage/internal/benchmarks/main.go @@ -153,6 +153,8 @@ func parseFlags() { flag.Int64Var(&opts.minReadOffset, "minimum_read_offset", 0, "minimum read offset in bytes") flag.Int64Var(&opts.maxReadOffset, "maximum_read_offset", 0, "maximum read offset in bytes") + flag.BoolVar(&opts.gcsFuse, "gcs_fuse", false, "use GCSFuse configs on HTTP client creation") + flag.IntVar(&opts.readBufferSize, "read_buffer_size", useDefault, "read buffer size in bytes") flag.IntVar(&opts.writeBufferSize, "write_buffer_size", useDefault, "write buffer size in bytes") @@ -173,8 +175,6 @@ func parseFlags() { flag.IntVar(&opts.workload, "workload", 1, "which workload to run") flag.IntVar(&opts.numObjectsPerDirectory, "directory_num_objects", 1000, "total number of objects in directory") - flag.BoolVar(&opts.gcsFuse, "gcs_fuse", false, "use GCSFuse configs on client creation") - flag.Parse() if len(projectID) < 1 { From 27928b31e2fa7bb9e9c70bea18dbfa10533e2bb1 Mon Sep 17 00:00:00 2001 From: BrennaEpp Date: Thu, 6 Jul 2023 22:30:45 -0700 Subject: [PATCH 3/3] suggestions --- storage/internal/benchmarks/client_pool.go | 19 ++++++++++--------- storage/internal/benchmarks/main.go | 4 ++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/storage/internal/benchmarks/client_pool.go b/storage/internal/benchmarks/client_pool.go index 3dd9732b50b0..2e0d180adc86 100644 --- a/storage/internal/benchmarks/client_pool.go +++ b/storage/internal/benchmarks/client_pool.go @@ -104,7 +104,7 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { writeBufferSize: opts.writeBufferSize, readBufferSize: opts.readBufferSize, useJSON: false, - setGCSFuseOpts: opts.gcsFuse, + setGCSFuseOpts: opts.useGCSFuseConfig, }) }, opts.numClients, @@ -121,7 +121,7 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { writeBufferSize: opts.writeBufferSize, readBufferSize: opts.readBufferSize, useJSON: true, - setGCSFuseOpts: opts.gcsFuse, + setGCSFuseOpts: opts.useGCSFuseConfig, }) }, opts.numClients, @@ -189,6 +189,10 @@ func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Cl // We need to modify the underlying HTTP client base := http.DefaultTransport.(*http.Transport).Clone() + // Set MaxIdleConnsPerHost for parity with the Storage library, as it + // sets this as well + base.MaxIdleConnsPerHost = 100 + if config.setGCSFuseOpts { base = &http.Transport{ MaxConnsPerHost: 100, @@ -198,19 +202,16 @@ func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Cl map[string]func(string, *tls.Conn) http.RoundTripper, ), } - } - - base.MaxIdleConnsPerHost = 100 // this is set in Storage as well - base.WriteBufferSize = config.writeBufferSize - base.ReadBufferSize = config.readBufferSize - - if !config.setGCSFuseOpts { + } else { http2Trans, err := http2.ConfigureTransports(base) if err == nil { http2Trans.ReadIdleTimeout = time.Second * 31 } } + base.WriteBufferSize = config.writeBufferSize + base.ReadBufferSize = config.readBufferSize + trans, err := htransport.NewTransport(ctx, base, option.WithScopes("https://www.googleapis.com/auth/devstorage.full_control")) if err != nil { diff --git a/storage/internal/benchmarks/main.go b/storage/internal/benchmarks/main.go index cd864213276e..3ec08b57d838 100644 --- a/storage/internal/benchmarks/main.go +++ b/storage/internal/benchmarks/main.go @@ -81,7 +81,7 @@ type benchmarkOptions struct { workload int numObjectsPerDirectory int - gcsFuse bool + useGCSFuseConfig bool } func (b *benchmarkOptions) validate() error { @@ -153,7 +153,7 @@ func parseFlags() { flag.Int64Var(&opts.minReadOffset, "minimum_read_offset", 0, "minimum read offset in bytes") flag.Int64Var(&opts.maxReadOffset, "maximum_read_offset", 0, "maximum read offset in bytes") - flag.BoolVar(&opts.gcsFuse, "gcs_fuse", false, "use GCSFuse configs on HTTP client creation") + flag.BoolVar(&opts.useGCSFuseConfig, "gcs_fuse", false, "use GCSFuse configs on HTTP client creation") flag.IntVar(&opts.readBufferSize, "read_buffer_size", useDefault, "read buffer size in bytes") flag.IntVar(&opts.writeBufferSize, "write_buffer_size", useDefault, "write buffer size in bytes")