Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): fix option propagation (#7669)
Browse files Browse the repository at this point in the history
  • Loading branch information
shollyman authored Mar 31, 2023
1 parent cf06802 commit f684e16
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 61 deletions.
22 changes: 4 additions & 18 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
}

// No existing pool available, create one for the location and add to shared pools.
pool, err := c.createPool(ctx, loc, nil, streamFunc)
pool, err := c.createPool(ctx, loc, streamFunc)
if err != nil {
return nil, err
}
Expand All @@ -227,7 +227,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
}

// createPool builds a connectionPool.
func (c *Client) createPool(ctx context.Context, location string, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
func (c *Client) createPool(ctx context.Context, location string, streamFunc streamClientFunc) (*connectionPool, error) {
cCtx, cancel := context.WithCancel(ctx)

if c.cfg == nil {
Expand All @@ -238,29 +238,15 @@ func (c *Client) createPool(ctx context.Context, location string, settings *stre
// add location header to the retained pool context.
cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location))
}
fcRequests := c.cfg.defaultInflightRequests
fcBytes := c.cfg.defaultInflightBytes
arOpts := c.cfg.defaultAppendRowsCallOptions
if settings != nil {
if settings.MaxInflightRequests > 0 {
fcRequests = settings.MaxInflightRequests
}
if settings.MaxInflightBytes > 0 {
fcBytes = settings.MaxInflightBytes
}
for _, o := range settings.appendCallOptions {
arOpts = append(arOpts, o)
}
}

pool := &connectionPool{
id: newUUID(poolIDPrefix),
location: location,
ctx: cCtx,
cancel: cancel,
open: createOpenF(ctx, streamFunc),
callOptions: arOpts,
baseFlowController: newFlowController(fcRequests, fcBytes),
callOptions: c.cfg.defaultAppendRowsCallOptions,
baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes),
}
router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize)
if err := pool.activateRouter(router); err != nil {
Expand Down
63 changes: 35 additions & 28 deletions bigquery/storage/managedwriter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCreatePool_Location(t *testing.T) {
c := &Client{
cfg: &writerClientConfig{},
}
pool, err := c.createPool(context.Background(), "foo", nil, nil)
pool, err := c.createPool(context.Background(), "foo", nil)
if err != nil {
t.Fatalf("createPool: %v", err)
}
Expand Down Expand Up @@ -86,18 +86,14 @@ func TestCreatePool_Location(t *testing.T) {
// of global configuration and per-writer configuration.
func TestCreatePool(t *testing.T) {
testCases := []struct {
desc string
cfg *writerClientConfig
settings *streamSettings
wantMaxBytes int
wantMaxRequests int
wantCallOptions int
wantErr bool
desc string
cfg *writerClientConfig
settings *streamSettings
wantMaxBytes int
wantMaxRequests int
wantCallOptions int
wantPoolCallOptions int
}{
{
desc: "no config",
wantErr: true,
},
{
desc: "cfg, no settings",
cfg: &writerClientConfig{
Expand Down Expand Up @@ -130,9 +126,9 @@ func TestCreatePool(t *testing.T) {
MaxInflightRequests: 99,
MaxInflightBytes: 1024,
},
wantMaxBytes: 1024,
wantMaxRequests: 99,
wantCallOptions: 1,
wantMaxBytes: 1024,
wantMaxRequests: 99,
wantPoolCallOptions: 1,
},
{
desc: "merge defaults and settings",
Expand All @@ -145,36 +141,47 @@ func TestCreatePool(t *testing.T) {
MaxInflightBytes: 1024,
appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
},
wantMaxBytes: 1024,
wantMaxRequests: 123,
wantCallOptions: 2,
wantMaxBytes: 1024,
wantMaxRequests: 123,
wantCallOptions: 1,
wantPoolCallOptions: 1,
},
}

for _, tc := range testCases {
c := &Client{
cfg: tc.cfg,
}
got, err := c.createPool(context.Background(), "", tc.settings, nil)
pool, err := c.createPool(context.Background(), "", nil)
if err != nil {
if !tc.wantErr {
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
}
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
continue
}
if err == nil && tc.wantErr {
t.Errorf("case %q: expected createPool to error but it did not", tc.desc)
continue
writer := &ManagedStream{
id: "foo",
streamSettings: tc.settings,
}
if err = pool.addWriter(writer); err != nil {
t.Errorf("case %q: addWriter: %v", tc.desc, err)
}
pw := newPendingWrite(context.Background(), writer, nil, nil, "", "")
gotConn, err := pool.selectConn(pw)
if err != nil {
t.Errorf("case %q: selectConn: %v", tc.desc, err)
}

// too many go-cmp overrides needed to quickly diff here, look at the interesting fields explicitly.
if gotVal := got.baseFlowController.maxInsertBytes; gotVal != tc.wantMaxBytes {
if gotVal := gotConn.fc.maxInsertBytes; gotVal != tc.wantMaxBytes {
t.Errorf("case %q: flowController maxInsertBytes mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxBytes)
}
if gotVal := got.baseFlowController.maxInsertCount; gotVal != tc.wantMaxRequests {
if gotVal := gotConn.fc.maxInsertCount; gotVal != tc.wantMaxRequests {
t.Errorf("case %q: flowController maxInsertCount mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxRequests)
}
if gotVal := len(got.callOptions); gotVal != tc.wantCallOptions {
if gotVal := len(gotConn.callOptions); gotVal != tc.wantCallOptions {
t.Errorf("case %q: calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantCallOptions)
}
if gotVal := len(pool.callOptions); gotVal != tc.wantPoolCallOptions {
t.Errorf("case %q: POOL calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantPoolCallOptions)
}
}
}
48 changes: 38 additions & 10 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type connectionPool struct {
// connection. Opening the connection is a stateless operation.
open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)

// We specify one set of calloptions for the pool.
// All connections in the pool open with the same call options.
// We specify default calloptions for the pool.
// Explicit connections may have their own calloptions as well.
callOptions []gax.CallOption

router poolRouter // poolManager makes the decisions about connections and routing.
Expand Down Expand Up @@ -119,6 +119,16 @@ func (pool *connectionPool) removeWriter(writer *ManagedStream) error {
return detachErr
}

func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption {
if co == nil {
return cp.callOptions
}
var mergedOpts []gax.CallOption
mergedOpts = append(mergedOpts, cp.callOptions...)
mergedOpts = append(mergedOpts, co.callOptions...)
return mergedOpts
}

// openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects
// when (re)opening the network connection to the backend.
//
Expand All @@ -127,7 +137,7 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite
r := &unaryRetryer{}
for {
recordStat(cp.ctx, AppendClientOpenCount, 1)
arc, err := cp.open(cp.callOptions...)
arc, err := cp.open(cp.mergeCallOptions(co)...)
if err != nil {
bo, shouldRetry := r.Retry(err)
if shouldRetry {
Expand Down Expand Up @@ -172,9 +182,10 @@ type connection struct {
id string
pool *connectionPool // each connection retains a reference to its owning pool.

fc *flowController // each connection has it's own flow controller.
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
cancel context.CancelFunc
fc *flowController // each connection has it's own flow controller.
callOptions []gax.CallOption // custom calloptions for this connection.
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
cancel context.CancelFunc

retry *statelessRetryer
optimizer sendOptimizer
Expand All @@ -197,16 +208,32 @@ const (
verboseConnectionMode connectionMode = "VERBOSE"
)

func newConnection(pool *connectionPool, mode connectionMode) *connection {
func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection {
if pool == nil {
return nil
}
// create and retain a cancellable context.
connCtx, cancel := context.WithCancel(pool.ctx)
fc := newFlowController(0, 0)
if pool != nil {
fc = copyFlowController(pool.baseFlowController)

// Resolve local overrides for flow control and call options
fcRequests := 0
fcBytes := 0
var opts []gax.CallOption

if pool.baseFlowController != nil {
fcRequests = pool.baseFlowController.maxInsertCount
fcBytes = pool.baseFlowController.maxInsertBytes
}
if settings != nil {
if settings.MaxInflightRequests > 0 {
fcRequests = settings.MaxInflightRequests
}
if settings.MaxInflightBytes > 0 {
fcBytes = settings.MaxInflightBytes
}
opts = settings.appendCallOptions
}
fc := newFlowController(fcRequests, fcBytes)
countLimit, byteLimit := computeLoadThresholds(fc)

return &connection{
Expand All @@ -218,6 +245,7 @@ func newConnection(pool *connectionPool, mode connectionMode) *connection {
optimizer: optimizer(mode),
loadBytesThreshold: byteLimit,
loadCountThreshold: countLimit,
callOptions: opts,
}
}

Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) {
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)),
},
}
conn := newConnection(pool, "")
conn := newConnection(pool, "", nil)
pool.openWithRetry(conn)
}

Expand Down
8 changes: 4 additions & 4 deletions bigquery/storage/managedwriter/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error {
defer rtr.mu.Unlock()
rtr.writers[writer.id] = struct{}{}
if rtr.conn == nil {
rtr.conn = newConnection(rtr.pool, rtr.mode)
rtr.conn = newConnection(rtr.pool, rtr.mode, nil)
}
return nil
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (sr *sharedRouter) writerAttach(writer *ManagedStream) error {
if pair := sr.exclusiveConns[writer.id]; pair != nil {
return fmt.Errorf("writer %q already attached", writer.id)
}
sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode)
sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings)
return nil
}

Expand Down Expand Up @@ -242,9 +242,9 @@ func (sr *sharedRouter) orderAndGrowMultiConns() {
return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad()
})
if len(sr.multiConns) == 0 {
sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode)}
sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}
} else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns {
sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode)}, sr.multiConns...)
sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...)
}
}

Expand Down

0 comments on commit f684e16

Please sign in to comment.