diff --git a/bigquery/storage_iterator.go b/bigquery/storage_iterator.go index c0ef9fedfb37..ae5ae2a66d17 100644 --- a/bigquery/storage_iterator.go +++ b/bigquery/storage_iterator.go @@ -270,6 +270,14 @@ func (it *arrowIterator) processStream(readStream string) { if it.session.ctx.Err() != nil { // context cancelled, don't queue error return } + backoff, shouldRetry := retryReadRows(bo, err) + if shouldRetry { + if err := gax.Sleep(it.ctx, backoff); err != nil { + return // context cancelled + } + continue + } + it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err) // try to re-open row stream with updated offset } } diff --git a/bigquery/storage_iterator_test.go b/bigquery/storage_iterator_test.go index 03293af069bc..75e0d87973a1 100644 --- a/bigquery/storage_iterator_test.go +++ b/bigquery/storage_iterator_test.go @@ -39,7 +39,7 @@ func TestStorageIteratorRetry(t *testing.T) { }{ { desc: "no error", - errors: []error{nil}, + errors: []error{}, wantFail: false, }, { @@ -49,10 +49,16 @@ func TestStorageIteratorRetry(t *testing.T) { status.Errorf(codes.Unavailable, "try 2"), status.Errorf(codes.Canceled, "try 3"), status.Errorf(codes.Internal, "try 4"), - nil, }, wantFail: false, }, + { + desc: "not enough permission", + errors: []error{ + status.Errorf(codes.PermissionDenied, "the user does not have 'bigquery.readsessions.getData' permission"), + }, + wantFail: true, + }, { desc: "permanent error", errors: []error{ @@ -71,18 +77,12 @@ func TestStorageIteratorRetry(t *testing.T) { wantFail: true, }, } - for _, tc := range testCases { - baseCtx := tc.ctx - if baseCtx == nil { - baseCtx = context.Background() + rrc := &testReadRowsClient{ + errors: tc.errors, } - ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second) - defer cancel() - it, err := newRawStorageRowIterator(&readSession{ - ctx: ctx, - settings: defaultReadClientSettings(), - readRowsFunc: func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) { + readRowsFuncs := map[string]func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error){ + "readRows fail on first call": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) { if len(tc.errors) == 0 { return &testReadRowsClient{}, nil } @@ -93,25 +93,42 @@ func TestStorageIteratorRetry(t *testing.T) { } return &testReadRowsClient{}, nil }, - bqSession: &storagepb.ReadSession{}, - }) - if err != nil { - t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err) + "readRows fails on Recv": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) { + return rrc, nil + }, } + for readRowsFuncType, readRowsFunc := range readRowsFuncs { + baseCtx := tc.ctx + if baseCtx == nil { + baseCtx = context.Background() + } + ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second) + defer cancel() - it.processStream("test-stream") + it, err := newRawStorageRowIterator(&readSession{ + ctx: ctx, + settings: defaultReadClientSettings(), + readRowsFunc: readRowsFunc, + bqSession: &storagepb.ReadSession{}, + }) + if err != nil { + t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err) + } + + it.processStream("test-stream") - if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) { - if tc.wantFail { - continue + if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) { + if tc.wantFail { + continue + } + t.Fatalf("case %s(%s): deadline exceeded", tc.desc, readRowsFuncType) + } + if tc.wantFail && len(it.errs) == 0 { + t.Fatalf("case %s(%s):want test to fail, but found no errors", tc.desc, readRowsFuncType) + } + if !tc.wantFail && len(it.errs) > 0 { + t.Fatalf("case %s(%s):test should not fail, but found %d errors", tc.desc, readRowsFuncType, len(it.errs)) } - t.Fatalf("case %s: deadline exceeded", tc.desc) - } - if tc.wantFail && len(it.errs) == 0 { - t.Fatalf("case %s:want test to fail, but found no errors", tc.desc) - } - if !tc.wantFail && len(it.errs) > 0 { - t.Fatalf("case %s:test should not fail, but found %d errors", tc.desc, len(it.errs)) } } } @@ -119,13 +136,19 @@ func TestStorageIteratorRetry(t *testing.T) { type testReadRowsClient struct { storagepb.BigQueryRead_ReadRowsClient responses []*storagepb.ReadRowsResponse + errors []error } func (trrc *testReadRowsClient) Recv() (*storagepb.ReadRowsResponse, error) { - if len(trrc.responses) == 0 { - return nil, io.EOF + if len(trrc.errors) > 0 { + err := trrc.errors[0] + trrc.errors = trrc.errors[1:] + return nil, err + } + if len(trrc.responses) > 0 { + r := trrc.responses[0] + trrc.responses = trrc.responses[:1] + return r, nil } - r := trrc.responses[0] - trrc.responses = trrc.responses[:1] - return r, nil + return nil, io.EOF }