diff --git a/bigquery/storage_iterator.go b/bigquery/storage_iterator.go index 70abe0cd2ea1..a7a8cdd38ce9 100644 --- a/bigquery/storage_iterator.go +++ b/bigquery/storage_iterator.go @@ -244,17 +244,10 @@ func (it *storageArrowIterator) processStream(readStream string) { Offset: offset, }) if err != nil { - if it.rs.ctx.Err() != nil { // context cancelled, don't try again + serr := it.handleProcessStreamError(readStream, bo, err) + if serr != nil { return } - backoff, shouldRetry := retryReadRows(bo, err) - if shouldRetry { - if err := gax.Sleep(it.rs.ctx, backoff); err != nil { - return // context cancelled - } - continue - } - it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err) continue } offset, err = it.consumeRowStream(readStream, rowStream, offset) @@ -262,22 +255,38 @@ func (it *storageArrowIterator) processStream(readStream string) { return } if err != nil { - if it.rs.ctx.Err() != nil { // context cancelled, don't queue error + serr := it.handleProcessStreamError(readStream, bo, err) + if serr != nil { return } - backoff, shouldRetry := retryReadRows(bo, err) - if shouldRetry { - if err := gax.Sleep(it.rs.ctx, backoff); err != nil { - return // context cancelled - } - continue - } - it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err) // try to re-open row stream with updated offset } } } +// handleProcessStreamError check if err is retryable, +// waiting with exponential backoff in that scenario. +// If error is not retryable, queue up err to be sent to user. +// Return error if should exit the goroutine. +func (it *storageArrowIterator) handleProcessStreamError(readStream string, bo gax.Backoff, err error) error { + if it.rs.ctx.Err() != nil { // context cancelled, don't try again + return it.rs.ctx.Err() + } + backoff, shouldRetry := retryReadRows(bo, err) + if shouldRetry { + if err := gax.Sleep(it.rs.ctx, backoff); err != nil { + return err // context cancelled + } + return nil + } + select { + case it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err): + return nil + case <-it.rs.ctx.Done(): + return context.Canceled + } +} + func retryReadRows(bo gax.Backoff, err error) (time.Duration, bool) { s, ok := status.FromError(err) if !ok { diff --git a/bigquery/storage_iterator_test.go b/bigquery/storage_iterator_test.go index 33dc77201dcc..5e83c7c4729f 100644 --- a/bigquery/storage_iterator_test.go +++ b/bigquery/storage_iterator_test.go @@ -29,6 +29,11 @@ import ( ) func TestStorageIteratorRetry(t *testing.T) { + settings := defaultReadClientSettings() + randomErrors := []error{} // generate more errors than the # of workers + for i := 0; i < settings.maxWorkerCount+2; i++ { + randomErrors = append(randomErrors, fmt.Errorf("random error %d", i)) + } cancelledCtx, cancel := context.WithCancel(context.Background()) cancel() testCases := []struct { @@ -84,6 +89,11 @@ func TestStorageIteratorRetry(t *testing.T) { }, wantFail: true, }, + { + desc: "filled with non-retryable errors and context cancelled", + errors: randomErrors, + wantFail: true, + }, } for _, tc := range testCases { rrc := &testReadRowsClient{ @@ -115,9 +125,9 @@ func TestStorageIteratorRetry(t *testing.T) { it, err := newRawStorageRowIterator(&readSession{ ctx: ctx, - settings: defaultReadClientSettings(), - readRowsFunc: readRowsFunc, + settings: settings, bqSession: &storagepb.ReadSession{}, + readRowsFunc: readRowsFunc, }, Schema{}) if err != nil { t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err) @@ -125,7 +135,8 @@ func TestStorageIteratorRetry(t *testing.T) { it.processStream("test-stream") - if errors.Is(it.rs.ctx.Err(), context.Canceled) || errors.Is(it.rs.ctx.Err(), context.DeadlineExceeded) { + if errors.Is(it.rs.ctx.Err(), context.Canceled) || + errors.Is(it.rs.ctx.Err(), context.DeadlineExceeded) { if tc.wantFail { continue }