From 823db4e208e899d2537080970f0fab52c287ae9f Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 8 Sep 2020 12:11:07 -0400 Subject: [PATCH 1/3] Fail if FetchTagged partially retrieves results due to error --- .../server/tchannelthrift/node/service.go | 26 +++--- .../tchannelthrift/node/service_test.go | 90 +++++++++++++++++++ 2 files changed, 104 insertions(+), 12 deletions(-) diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 652b47c409..268c6cd767 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -683,9 +683,11 @@ func (s *service) fetchTagged(ctx context.Context, db storage.Database, req *rpc return nil, err } - // Step 2: If fetching data read the results of the asynchronuous block readers. + // Step 2: If fetching data read the results of the asynchronous block readers. if fetchData { - s.fetchReadResults(ctx, response, nsID, encodedDataResults) + if err := s.fetchReadResults(ctx, response, nsID, callStart, encodedDataResults); err != nil { + return nil, err + } } s.metrics.fetchTagged.ReportSuccess(s.nowFn().Sub(callStart)) @@ -740,7 +742,8 @@ func (s *service) fetchReadEncoded(ctx context.Context, encoded, err := db.ReadEncoded(ctx, nsID, tsID, opts.StartInclusive, opts.EndExclusive) if err != nil { - elem.Err = convert.ToRPCError(err) + s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) + return convert.ToRPCError(err) } else { encodedDataResults[idx] = encoded } @@ -748,11 +751,13 @@ func (s *service) fetchReadEncoded(ctx context.Context, return nil } -func (s *service) fetchReadResults(ctx context.Context, +func (s *service) fetchReadResults( + ctx context.Context, response *rpc.FetchTaggedResult_, nsID ident.ID, + callStart time.Time, encodedDataResults [][][]xio.BlockReader, -) { +) error { ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadResults) if sampled { sp.LogFields( @@ -762,19 +767,16 @@ func (s *service) fetchReadResults(ctx context.Context, } defer sp.Finish() - for idx, elem := range response.Elements { - if elem.Err != nil { - continue - } - + for idx := range response.Elements { segments, rpcErr := s.readEncodedResult(ctx, nsID, encodedDataResults[idx]) if rpcErr != nil { - elem.Err = rpcErr - continue + s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) + return rpcErr } response.Elements[idx].Segments = segments } + return nil } func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 9e43ae2561..d4ee5a1025 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1879,6 +1879,96 @@ func TestServiceFetchTaggedErrs(t *testing.T) { require.Error(t, err) } +func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false) + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + mtr := mocktracer.New() + sp := mtr.StartSpan("root") + ctx.SetGoContext(opentracing.ContextWithSpan(gocontext.Background(), sp)) + + start := time.Now().Add(-2 * time.Hour) + end := start.Add(2 * time.Hour) + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + + nsID := "metrics" + + id := "foo" + s := []struct { + t time.Time + v float64 + }{ + {start.Add(10 * time.Second), 1.0}, + {start.Add(20 * time.Second), 2.0}, + } + enc := testStorageOpts.EncoderPool().Get() + enc.Reset(start, 0, nil) + for _, v := range s { + dp := ts.Datapoint{ + Timestamp: v.t, + Value: v.v, + } + require.NoError(t, enc.Encode(dp, xtime.Second, nil)) + } + + stream, _ := enc.Stream(ctx) + mockDB.EXPECT(). + ReadEncoded(gomock.Any(), ident.NewIDMatcher(nsID), ident.NewIDMatcher(id), start, end). + Return([][]xio.BlockReader{{ + xio.BlockReader{ + SegmentReader: stream, + }, + }}, fmt.Errorf("random err")) // Return error that should trigger failure of the entire call + + req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*")) + require.NoError(t, err) + qry := index.Query{Query: req} + + resMap := index.NewQueryResults(ident.StringID(nsID), + index.QueryResultsOptions{}, testIndexOptions) + resMap.Map().Set(ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags( + ident.StringTag("foo", "bar"), + ident.StringTag("baz", "dxk"), + ))) + + mockDB.EXPECT().QueryIDs( + gomock.Any(), + ident.NewIDMatcher(nsID), + index.NewQueryMatcher(qry), + index.QueryOptions{ + StartInclusive: start, + EndExclusive: end, + SeriesLimit: 10, + }).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil) + + startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS) + require.NoError(t, err) + endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) + require.NoError(t, err) + var limit int64 = 10 + data, err := idx.Marshal(req) + require.NoError(t, err) + _, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: true, + Limit: &limit, + }) + require.Error(t, err) +} + func TestServiceAggregate(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() From 7258896ebc6073f3bf7706b7025352408ce1be8a Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 8 Sep 2020 14:09:12 -0400 Subject: [PATCH 2/3] PR feedback --- src/dbnode/network/server/tchannelthrift/node/service.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 268c6cd767..d55983b80e 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -680,12 +680,14 @@ func (s *service) fetchTagged(ctx context.Context, db storage.Database, req *rpc encodedDataResults = make([][][]xio.BlockReader, results.Size()) } if err := s.fetchReadEncoded(ctx, db, response, results, nsID, nsIDBytes, callStart, opts, fetchData, encodedDataResults); err != nil { + s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return nil, err } // Step 2: If fetching data read the results of the asynchronous block readers. if fetchData { - if err := s.fetchReadResults(ctx, response, nsID, callStart, encodedDataResults); err != nil { + if err := s.fetchReadResults(ctx, response, nsID, encodedDataResults); err != nil { + s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return nil, err } } @@ -725,7 +727,6 @@ func (s *service) fetchReadEncoded(ctx context.Context, ctx.RegisterFinalizer(enc) encodedTags, err := s.encodeTags(enc, tags) if err != nil { // This is an invariant, should never happen - s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return tterrors.NewInternalError(err) } @@ -742,7 +743,6 @@ func (s *service) fetchReadEncoded(ctx context.Context, encoded, err := db.ReadEncoded(ctx, nsID, tsID, opts.StartInclusive, opts.EndExclusive) if err != nil { - s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return convert.ToRPCError(err) } else { encodedDataResults[idx] = encoded @@ -755,7 +755,6 @@ func (s *service) fetchReadResults( ctx context.Context, response *rpc.FetchTaggedResult_, nsID ident.ID, - callStart time.Time, encodedDataResults [][][]xio.BlockReader, ) error { ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadResults) @@ -770,7 +769,6 @@ func (s *service) fetchReadResults( for idx := range response.Elements { segments, rpcErr := s.readEncodedResult(ctx, nsID, encodedDataResults[idx]) if rpcErr != nil { - s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return rpcErr } From 3344d2d9c0fbc52cd6383292e4a0b5552ff34978 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Thu, 10 Sep 2020 10:44:49 -0400 Subject: [PATCH 3/3] Add comment marking Err field as deprecated --- src/dbnode/generated/thrift/rpc.thrift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index 9f1674a50c..6263377cc6 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -183,6 +183,8 @@ struct FetchTaggedIDResult { 2: required binary nameSpace 3: required binary encodedTags 4: optional list segments + + // Deprecated -- do not use. 5: optional Error err }