Skip to content

Commit

Permalink
fix(bigquery): processStream check ctx done when queuing non retryabl…
Browse files Browse the repository at this point in the history
…e err (#10675)
  • Loading branch information
alvarowolfx authored Aug 13, 2024
1 parent 1afb9ee commit 60ad7f3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 21 deletions.
45 changes: 27 additions & 18 deletions bigquery/storage_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,40 +244,49 @@ func (it *storageArrowIterator) processStream(readStream string) {
Offset: offset,
})
if err != nil {
if it.rs.ctx.Err() != nil { // context cancelled, don't try again
serr := it.handleProcessStreamError(readStream, bo, err)
if serr != nil {
return
}
backoff, shouldRetry := retryReadRows(bo, err)
if shouldRetry {
if err := gax.Sleep(it.rs.ctx, backoff); err != nil {
return // context cancelled
}
continue
}
it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err)
continue
}
offset, err = it.consumeRowStream(readStream, rowStream, offset)
if errors.Is(err, io.EOF) {
return
}
if err != nil {
if it.rs.ctx.Err() != nil { // context cancelled, don't queue error
serr := it.handleProcessStreamError(readStream, bo, err)
if serr != nil {
return
}
backoff, shouldRetry := retryReadRows(bo, err)
if shouldRetry {
if err := gax.Sleep(it.rs.ctx, backoff); err != nil {
return // context cancelled
}
continue
}
it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err)
// try to re-open row stream with updated offset
}
}
}

// handleProcessStreamError check if err is retryable,
// waiting with exponential backoff in that scenario.
// If error is not retryable, queue up err to be sent to user.
// Return error if should exit the goroutine.
func (it *storageArrowIterator) handleProcessStreamError(readStream string, bo gax.Backoff, err error) error {
if it.rs.ctx.Err() != nil { // context cancelled, don't try again
return it.rs.ctx.Err()
}
backoff, shouldRetry := retryReadRows(bo, err)
if shouldRetry {
if err := gax.Sleep(it.rs.ctx, backoff); err != nil {
return err // context cancelled
}
return nil
}
select {
case it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err):
return nil
case <-it.rs.ctx.Done():
return context.Canceled
}
}

func retryReadRows(bo gax.Backoff, err error) (time.Duration, bool) {
s, ok := status.FromError(err)
if !ok {
Expand Down
17 changes: 14 additions & 3 deletions bigquery/storage_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
)

func TestStorageIteratorRetry(t *testing.T) {
settings := defaultReadClientSettings()
randomErrors := []error{} // generate more errors than the # of workers
for i := 0; i < settings.maxWorkerCount+2; i++ {
randomErrors = append(randomErrors, fmt.Errorf("random error %d", i))
}
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()
testCases := []struct {
Expand Down Expand Up @@ -84,6 +89,11 @@ func TestStorageIteratorRetry(t *testing.T) {
},
wantFail: true,
},
{
desc: "filled with non-retryable errors and context cancelled",
errors: randomErrors,
wantFail: true,
},
}
for _, tc := range testCases {
rrc := &testReadRowsClient{
Expand Down Expand Up @@ -115,17 +125,18 @@ func TestStorageIteratorRetry(t *testing.T) {

it, err := newRawStorageRowIterator(&readSession{
ctx: ctx,
settings: defaultReadClientSettings(),
readRowsFunc: readRowsFunc,
settings: settings,
bqSession: &storagepb.ReadSession{},
readRowsFunc: readRowsFunc,
}, Schema{})
if err != nil {
t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
}

it.processStream("test-stream")

if errors.Is(it.rs.ctx.Err(), context.Canceled) || errors.Is(it.rs.ctx.Err(), context.DeadlineExceeded) {
if errors.Is(it.rs.ctx.Err(), context.Canceled) ||
errors.Is(it.rs.ctx.Err(), context.DeadlineExceeded) {
if tc.wantFail {
continue
}
Expand Down

0 comments on commit 60ad7f3

Please sign in to comment.