diff --git a/src/query/errors/storage.go b/src/query/errors/storage.go index 91c40810ab..dead6d6ebf 100644 --- a/src/query/errors/storage.go +++ b/src/query/errors/storage.go @@ -22,8 +22,6 @@ package errors import ( "errors" - - terrors "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/errors" ) var ( @@ -90,8 +88,4 @@ var ( // request returns an inconsistenent type. ErrInconsistentCompleteTagsType = errors.New("inconsistent complete tags" + " response type") - - // ErrSeriesLimit is an error returned when the series limit is exceeded and the caller required exhaustive results. - // This is wrapped in a ResourceExhaustedError so it behaves the same as an error returned from the db. - ErrSeriesLimit = terrors.NewResourceExhaustedError(errors.New("series limit exceeded")) ) diff --git a/src/query/storage/m3/consolidators/multi_fetch_result.go b/src/query/storage/m3/consolidators/multi_fetch_result.go index eee0314e7d..e6bff43bf0 100644 --- a/src/query/storage/m3/consolidators/multi_fetch_result.go +++ b/src/query/storage/m3/consolidators/multi_fetch_result.go @@ -21,11 +21,13 @@ package consolidators import ( + "errors" + "fmt" "sync" "github.com/m3db/m3/src/dbnode/encoding" + terrors "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/errors" "github.com/m3db/m3/src/query/block" - "github.com/m3db/m3/src/query/errors" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage/m3/storagemetadata" xerrors "github.com/m3db/m3/src/x/errors" @@ -202,9 +204,11 @@ func (r *multiResult) Add( return } + nsID := newIterators.Iters()[0].Namespace().String() + // the series limit was reached within this namespace. if !metadata.Exhaustive && r.limitOpts.RequireExhaustive { - r.err = r.err.Add(errors.ErrSeriesLimit) + r.err = r.err.Add(newSeriesLimitErr(fmt.Sprintf("series limit exceeded for namespace %s", nsID))) return } @@ -257,11 +261,18 @@ func (r *multiResult) Add( if !added && r.err.Empty() { r.metadata.Exhaustive = false if r.limitOpts.RequireExhaustive { - r.err = r.err.Add(errors.ErrSeriesLimit) + r.err = r.err.Add( + newSeriesLimitErr(fmt.Sprintf("series limit exceeded adding namespace %s to results", nsID))) } } } +func newSeriesLimitErr(msg string) error { + // wrap in a ResourceExhaustedError so it's the same type as the query limit error returned from a single database + // instance. + return terrors.NewResourceExhaustedError(errors.New(msg)) +} + func (r *multiResult) addOrUpdateDedupeMap( attrs storagemetadata.Attributes, newIterators encoding.SeriesIterators, diff --git a/src/query/storage/m3/consolidators/multi_fetch_result_test.go b/src/query/storage/m3/consolidators/multi_fetch_result_test.go index cce19d6d5b..9c0e1ead30 100644 --- a/src/query/storage/m3/consolidators/multi_fetch_result_test.go +++ b/src/query/storage/m3/consolidators/multi_fetch_result_test.go @@ -56,25 +56,32 @@ func generateSeriesIterators( ctrl *gomock.Controller, ns string) encoding.SeriesIterators { iter := encoding.NewMockSeriesIterator(ctrl) iter.EXPECT().ID().Return(ident.StringID(common)).MinTimes(1) - iter.EXPECT().Namespace().Return(ident.StringID(ns)).MaxTimes(1) + iter.EXPECT().Namespace().Return(ident.StringID(ns)).AnyTimes() iter.EXPECT().Tags().Return(ident.EmptyTagIterator).AnyTimes() unique := encoding.NewMockSeriesIterator(ctrl) unique.EXPECT().ID().Return(ident.StringID(ns)).MinTimes(1) - unique.EXPECT().Namespace().Return(ident.StringID(ns)).MaxTimes(1) + unique.EXPECT().Namespace().Return(ident.StringID(ns)).AnyTimes() unique.EXPECT().Tags().Return(ident.EmptyTagIterator).AnyTimes() iters := encoding.NewMockSeriesIterators(ctrl) iters.EXPECT().Close().Return().Times(1) iters.EXPECT().Len().Return(1).AnyTimes() - iters.EXPECT().Iters().Return([]encoding.SeriesIterator{iter, unique}) + iters.EXPECT().Iters().Return([]encoding.SeriesIterator{iter, unique}).AnyTimes() return iters } -func generateUnreadSeriesIterators(ctrl *gomock.Controller) encoding.SeriesIterators { +func generateUnreadSeriesIterators(ctrl *gomock.Controller, ns string) encoding.SeriesIterators { + iter := encoding.NewMockSeriesIterator(ctrl) + iter.EXPECT().Namespace().Return(ident.StringID(ns)).AnyTimes() + + unique := encoding.NewMockSeriesIterator(ctrl) + unique.EXPECT().Namespace().Return(ident.StringID(ns)).AnyTimes() + iters := encoding.NewMockSeriesIterators(ctrl) iters.EXPECT().Len().Return(1).AnyTimes() + iters.EXPECT().Iters().Return([]encoding.SeriesIterator{iter, unique}).AnyTimes() return iters } @@ -185,7 +192,7 @@ func TestLimit(t *testing.T) { r.Add(iters, meta, ns.attrs, nil) } longNs := namespaces[2] - r.Add(generateUnreadSeriesIterators(ctrl), meta, longNs.attrs, nil) + r.Add(generateUnreadSeriesIterators(ctrl, longNs.ns), meta, longNs.attrs, nil) result, err := r.FinalResult() assert.NoError(t, err) @@ -224,7 +231,7 @@ func TestLimitRequireExhaustive(t *testing.T) { r.Add(iters, meta, ns.attrs, nil) } longNs := namespaces[2] - r.Add(generateUnreadSeriesIterators(ctrl), meta, longNs.attrs, nil) + r.Add(generateUnreadSeriesIterators(ctrl, longNs.ns), meta, longNs.attrs, nil) _, err := r.FinalResult() require.Error(t, err) @@ -257,6 +264,7 @@ func TestExhaustiveMerge(t *testing.T) { iters := encoding.NewSeriesIterators([]encoding.SeriesIterator{ encoding.NewSeriesIterator(encoding.SeriesIteratorOptions{ ID: ident.StringID(fmt.Sprint(i)), + Namespace: ident.StringID("ns"), }, nil), }, nil)