Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): fix flowcontrol refund on error (g…
Browse files Browse the repository at this point in the history
…oogleapis#9649)

Previously, connection's `lockingAppend` did not properly refund the
connection's flow controller if the send response errored.  This PR
addresses that issue, and includes a test to ensure the correct
behavior.

Fixes: googleapis#9540
  • Loading branch information
shollyman authored Mar 26, 2024
1 parent 1f7fb5c commit a07bf1d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
2 changes: 2 additions & 0 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
err = (*arc).Send(pw.constructFullRequest(true))
}
if err != nil {
// Refund the flow controller immediately, as there's nothing to refund on the receiver.
co.fc.release(pw.reqSize)
if shouldReconnect(err) {
metricCtx := co.ctx // start with the ctx that must be present
if pw.writer != nil {
Expand Down
52 changes: 52 additions & 0 deletions bigquery/storage/managedwriter/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,58 @@ func TestConnection_OpenWithRetry(t *testing.T) {
}
}

// Ensure we properly refund the flow control during send failures.
// https://github.com/googleapis/google-cloud-go/issues/9540
func TestConnection_LockingAppendFlowRelease(t *testing.T) {
ctx := context.Background()

pool := &connectionPool{
ctx: ctx,
baseFlowController: newFlowController(10, 0),
open: openTestArc(&testAppendRowsClient{},
func(req *storagepb.AppendRowsRequest) error {
// Append always reports EOF on send.
return io.EOF
}, nil),
}
router := newSimpleRouter("")
if err := pool.activateRouter(router); err != nil {
t.Errorf("activateRouter: %v", err)
}

writer := &ManagedStream{id: "foo", ctx: ctx}
if err := pool.addWriter(writer); err != nil {
t.Errorf("addWriter: %v", err)
}

pw := newPendingWrite(ctx, writer, &storagepb.AppendRowsRequest{WriteStream: "somestream"}, newVersionedTemplate(), "", "")
for i := 0; i < 5; i++ {
conn, err := router.pool.selectConn(pw)
if err != nil {
t.Errorf("selectConn: %v", err)
}

// Ensure FC is empty before lockingAppend
if got := conn.fc.count(); got != 0 {
t.Errorf("attempt %d expected empty flow count, got %d", i, got)
}
if got := conn.fc.bytes(); got != 0 {
t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
}
// invoke lockingAppend, which fails
if err := conn.lockingAppend(pw); err != io.EOF {
t.Errorf("lockingAppend attempt %d: expected io.EOF, got %v", i, err)
}
// Ensure we're refunded due to failure
if got := conn.fc.count(); got != 0 {
t.Errorf("attempt %d expected empty flow count, got %d", i, got)
}
if got := conn.fc.bytes(); got != 0 {
t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
}
}
}

// Ensures we don't lose track of channels/connections during reconnects.
// https://github.com/googleapis/google-cloud-go/issues/6766
func TestConnection_LeakingReconnect(t *testing.T) {
Expand Down

0 comments on commit a07bf1d

Please sign in to comment.