diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 652b47c409..268c6cd767 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -683,9 +683,11 @@ func (s *service) fetchTagged(ctx context.Context, db storage.Database, req *rpc return nil, err } - // Step 2: If fetching data read the results of the asynchronuous block readers. + // Step 2: If fetching data read the results of the asynchronous block readers. if fetchData { - s.fetchReadResults(ctx, response, nsID, encodedDataResults) + if err := s.fetchReadResults(ctx, response, nsID, callStart, encodedDataResults); err != nil { + return nil, err + } } s.metrics.fetchTagged.ReportSuccess(s.nowFn().Sub(callStart)) @@ -740,7 +742,8 @@ func (s *service) fetchReadEncoded(ctx context.Context, encoded, err := db.ReadEncoded(ctx, nsID, tsID, opts.StartInclusive, opts.EndExclusive) if err != nil { - elem.Err = convert.ToRPCError(err) + s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) + return convert.ToRPCError(err) } else { encodedDataResults[idx] = encoded } @@ -748,11 +751,13 @@ func (s *service) fetchReadEncoded(ctx context.Context, return nil } -func (s *service) fetchReadResults(ctx context.Context, +func (s *service) fetchReadResults( + ctx context.Context, response *rpc.FetchTaggedResult_, nsID ident.ID, + callStart time.Time, encodedDataResults [][][]xio.BlockReader, -) { +) error { ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadResults) if sampled { sp.LogFields( @@ -762,19 +767,16 @@ func (s *service) fetchReadResults(ctx context.Context, } defer sp.Finish() - for idx, elem := range response.Elements { - if elem.Err != nil { - continue - } - + for idx := range response.Elements { segments, rpcErr := s.readEncodedResult(ctx, nsID, encodedDataResults[idx]) if rpcErr != nil { - elem.Err = rpcErr - continue + s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) + return rpcErr } response.Elements[idx].Segments = segments } + return nil } func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 9e43ae2561..d4ee5a1025 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1879,6 +1879,96 @@ func TestServiceFetchTaggedErrs(t *testing.T) { require.Error(t, err) } +func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false) + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + mtr := mocktracer.New() + sp := mtr.StartSpan("root") + ctx.SetGoContext(opentracing.ContextWithSpan(gocontext.Background(), sp)) + + start := time.Now().Add(-2 * time.Hour) + end := start.Add(2 * time.Hour) + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + + nsID := "metrics" + + id := "foo" + s := []struct { + t time.Time + v float64 + }{ + {start.Add(10 * time.Second), 1.0}, + {start.Add(20 * time.Second), 2.0}, + } + enc := testStorageOpts.EncoderPool().Get() + enc.Reset(start, 0, nil) + for _, v := range s { + dp := ts.Datapoint{ + Timestamp: v.t, + Value: v.v, + } + require.NoError(t, enc.Encode(dp, xtime.Second, nil)) + } + + stream, _ := enc.Stream(ctx) + mockDB.EXPECT(). + ReadEncoded(gomock.Any(), ident.NewIDMatcher(nsID), ident.NewIDMatcher(id), start, end). + Return([][]xio.BlockReader{{ + xio.BlockReader{ + SegmentReader: stream, + }, + }}, fmt.Errorf("random err")) // Return error that should trigger failure of the entire call + + req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*")) + require.NoError(t, err) + qry := index.Query{Query: req} + + resMap := index.NewQueryResults(ident.StringID(nsID), + index.QueryResultsOptions{}, testIndexOptions) + resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags( + ident.StringTag("foo", "bar"), + ident.StringTag("baz", "dxk"), + ))) + + mockDB.EXPECT().QueryIDs( + gomock.Any(), + ident.NewIDMatcher(nsID), + index.NewQueryMatcher(qry), + index.QueryOptions{ + StartInclusive: start, + EndExclusive: end, + SeriesLimit: 10, + }).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil) + + startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS) + require.NoError(t, err) + endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) + require.NoError(t, err) + var limit int64 = 10 + data, err := idx.Marshal(req) + require.NoError(t, err) + _, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: true, + Limit: &limit, + }) + require.Error(t, err) +} + func TestServiceAggregate(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish()