Skip to content

Commit

Permalink
fix(bigquery): avoid double-channel-close during context cancellation (
Browse files Browse the repository at this point in the history
…#7467)

EnableWriteRetries exposes an issue with finalizing a pending write.
The appendWithRetry function incorrectly marks a pending write done
in certain situations (context cancellations), when responsibility
is held by it's callers.

PR adds fix and a test that exercises the failure condition.

Fixes: https://togithub.com/googleapis/google-cloud-go/issues/7380
  • Loading branch information
shollyman authored Feb 22, 2023
1 parent 4810e8d commit ca4b2ef
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
5 changes: 3 additions & 2 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,9 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
}
continue
}
// Mark the pending write done. This will not be returned to the user, they'll receive the returned error.
pw.markDone(nil, appendErr, ms.fc)
// This append cannot be retried locally. It is not the responsibility of this function to finalize the pending
// write however, as that's handled by callers.
// Related: https://github.com/googleapis/google-cloud-go/issues/7380
return appendErr
}
return nil
Expand Down
49 changes: 49 additions & 0 deletions bigquery/storage/managedwriter/managed_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,52 @@ func TestManagedStream_Receiver(t *testing.T) {
cancel()
}
}

func TestManagedWriter_CancellationDuringRetry(t *testing.T) {
// Issue: double close of pending write.
// https://github.com/googleapis/google-cloud-go/issues/7380
ctx, cancel := context.WithCancel(context.Background())

ms := &ManagedStream{
ctx: ctx,
open: openTestArc(&testAppendRowsClient{},
func(req *storagepb.AppendRowsRequest) error {
// Append doesn't error, but is slow.
time.Sleep(time.Second)
return nil
},
func() (*storagepb.AppendRowsResponse, error) {
// Response is slow and always returns a retriable error.
time.Sleep(2 * time.Second)
return nil, io.EOF
}),
streamSettings: defaultStreamSettings(),
fc: newFlowController(0, 0),
retry: newStatelessRetryer(),
schemaDescriptor: &descriptorpb.DescriptorProto{
Name: proto.String("testDescriptor"),
},
}

fakeData := [][]byte{
[]byte("foo"),
}

res, err := ms.AppendRows(context.Background(), fakeData)
if err != nil {
t.Errorf("AppendRows send err: %v", err)
}
cancel()

select {

case <-res.Ready():
if _, err := res.GetResult(context.Background()); err == nil {
t.Errorf("expected failure, got success")
}

case <-time.After(5 * time.Second):
t.Errorf("result was not ready in expected time")

}
}

0 comments on commit ca4b2ef

Please sign in to comment.