Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): add GCSFuse client config [benchmarking] #8225

Merged
merged 7 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 59 additions & 20 deletions storage/internal/benchmarks/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"crypto/tls"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -87,7 +88,10 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {

nonBenchmarkingClients, closeNonBenchmarking = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, useDefault, useDefault, false)
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: useDefault,
readBufferSize: useDefault,
})
},
1,
)
Expand All @@ -96,7 +100,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {
if opts.api == mixedAPIs || opts.api == xmlAPI {
xmlClients, closeXML = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, false)
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
useJSON: false,
setGCSFuseOpts: opts.gcsFuse,
})
},
opts.numClients,
)
Expand All @@ -108,7 +117,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {
if opts.api == mixedAPIs || opts.api == jsonAPI || opts.api == xmlAPI {
jsonClients, closeJSON = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, true)
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
useJSON: true,
setGCSFuseOpts: opts.gcsFuse,
})
},
opts.numClients,
)
Expand All @@ -118,7 +132,11 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {
if opts.api == mixedAPIs || opts.api == grpcAPI || opts.api == directPath {
gRPCClients, closeGRPC = newClientPool(
func() (*storage.Client, error) {
return initializeGRPCClient(context.Background(), opts.writeBufferSize, opts.readBufferSize, opts.connPoolSize)
return initializeGRPCClient(context.Background(), clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
connectionPoolSize: opts.connPoolSize,
})
},
opts.numClients,
)
Expand Down Expand Up @@ -156,20 +174,41 @@ func getClient(ctx context.Context, api benchmarkAPI) *storage.Client {
// mutex on starting a client so that we can set an env variable for GRPC clients
var clientMu sync.Mutex

func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, json bool) (*storage.Client, error) {
// Client config
type clientConfig struct {
writeBufferSize, readBufferSize int
useJSON bool // only applicable to HTTP Clients
setGCSFuseOpts bool // only applicable to HTTP Clients
connectionPoolSize int // only applicable to GRPC Clients
}

func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Client, error) {
opts := []option.ClientOption{}

if writeBufferSize != useDefault || readBufferSize != useDefault {
if config.writeBufferSize != useDefault || config.readBufferSize != useDefault || config.setGCSFuseOpts {
// We need to modify the underlying HTTP client

base := http.DefaultTransport.(*http.Transport).Clone()
base.MaxIdleConnsPerHost = 100 // this is set in Storage as well
base.WriteBufferSize = writeBufferSize
base.ReadBufferSize = readBufferSize

http2Trans, err := http2.ConfigureTransports(base)
if err == nil {
http2Trans.ReadIdleTimeout = time.Second * 31
if config.setGCSFuseOpts {
base = &http.Transport{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to create another object, since we are cloning the transport object in line:190. We can just override the attributes. Like,
base.MaxConnsPerHost = 100
base.MaxIdleConnsPerHost = 100
base.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep the exact initialization code as gcsFuse has. http.DefaultTransport configures several transport variables to default values, and I'm not sure manually initializing an http.Transport would have those configured the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
// This disables HTTP/2 in transport.
TLSNextProto: make(
map[string]func(string, *tls.Conn) http.RoundTripper,
),
}
}

base.MaxIdleConnsPerHost = 100 // this is set in Storage as well
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
base.WriteBufferSize = config.writeBufferSize
base.ReadBufferSize = config.readBufferSize

if !config.setGCSFuseOpts {
http2Trans, err := http2.ConfigureTransports(base)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
http2Trans.ReadIdleTimeout = time.Second * 31
}
}

trans, err := htransport.NewTransport(ctx, base,
Expand All @@ -181,7 +220,7 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i
opts = append(opts, option.WithHTTPClient(&http.Client{Transport: trans}))
}

if json {
if config.useJSON {
opts = append(opts, storage.WithJSONReads())
}

Expand All @@ -193,14 +232,14 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i
return client, err
}

func initializeGRPCClient(ctx context.Context, writeBufferSize, readBufferSize int, connectionPoolSize int) (*storage.Client, error) {
opts := []option.ClientOption{option.WithGRPCConnectionPool(connectionPoolSize)}
func initializeGRPCClient(ctx context.Context, config clientConfig) (*storage.Client, error) {
opts := []option.ClientOption{option.WithGRPCConnectionPool(config.connectionPoolSize)}

if writeBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(writeBufferSize)))
if config.writeBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(config.writeBufferSize)))
}
if readBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(readBufferSize)))
if config.readBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(config.readBufferSize)))
}

clientMu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions storage/internal/benchmarks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type benchmarkOptions struct {
numClients int
workload int
numObjectsPerDirectory int

gcsFuse bool
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
}

func (b *benchmarkOptions) validate() error {
Expand Down Expand Up @@ -151,6 +153,8 @@ func parseFlags() {
flag.Int64Var(&opts.minReadOffset, "minimum_read_offset", 0, "minimum read offset in bytes")
flag.Int64Var(&opts.maxReadOffset, "maximum_read_offset", 0, "maximum read offset in bytes")

flag.BoolVar(&opts.gcsFuse, "gcs_fuse", false, "use GCSFuse configs on HTTP client creation")

flag.IntVar(&opts.readBufferSize, "read_buffer_size", useDefault, "read buffer size in bytes")
flag.IntVar(&opts.writeBufferSize, "write_buffer_size", useDefault, "write buffer size in bytes")

Expand Down