Skip to content

Commit

Permalink
[dbnode/query] Limit docs per query (#2415)
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Jun 23, 2020
1 parent c41e28e commit 6f05354
Show file tree
Hide file tree
Showing 42 changed files with 652 additions and 189 deletions.
22 changes: 15 additions & 7 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ var (

defaultCarbonIngesterAggregationType = aggregation.Mean

defaultStorageQueryLimit = 10000
defaultStorageQuerySeriesLimit = 10000
defaultStorageQueryDocsLimit = 0 // Default OFF.
)

// Configuration is the configuration for the query service.
Expand Down Expand Up @@ -308,6 +309,9 @@ type PerQueryLimitsConfiguration struct {
// MaxFetchedSeries limits the number of time series returned by a storage node.
MaxFetchedSeries int `yaml:"maxFetchedSeries"`

// MaxFetchedDocs limits the number of index documents matched by a query.
MaxFetchedDocs int `yaml:"maxFetchedDocs"`

// RequireExhaustive results in an error if the query exceeds the series limit.
RequireExhaustive bool `yaml:"requireExhaustive"`
}
Expand All @@ -321,15 +325,19 @@ func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerO
// AsFetchOptionsBuilderOptions converts this configuration to
// handler.FetchOptionsBuilderOptions.
func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleroptions.FetchOptionsBuilderOptions {
if l.MaxFetchedSeries <= 0 {
return handleroptions.FetchOptionsBuilderOptions{
Limit: defaultStorageQueryLimit,
RequireExhaustive: l.RequireExhaustive,
}
seriesLimit := defaultStorageQuerySeriesLimit
if v := l.MaxFetchedSeries; v > 0 {
seriesLimit = v
}

docsLimit := defaultStorageQueryDocsLimit
if v := l.MaxFetchedDocs; v > 0 {
docsLimit = v
}

return handleroptions.FetchOptionsBuilderOptions{
Limit: int(l.MaxFetchedSeries),
SeriesLimit: int(seriesLimit),
DocsLimit: int(docsLimit),
RequireExhaustive: l.RequireExhaustive,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ struct FetchTaggedRequest {
6: optional i64 limit
7: optional TimeType rangeTimeType = TimeType.UNIX_SECONDS
8: optional bool requireExhaustive = false
9: optional i64 docsLimit
}

struct FetchTaggedResult {
Expand Down
46 changes: 46 additions & 0 deletions src/dbnode/generated/thrift/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,6 +3050,7 @@ func (p *Segment) String() string {
// - Limit
// - RangeTimeType
// - RequireExhaustive
// - DocsLimit
type FetchTaggedRequest struct {
NameSpace []byte `thrift:"nameSpace,1,required" db:"nameSpace" json:"nameSpace"`
Query []byte `thrift:"query,2,required" db:"query" json:"query"`
Expand All @@ -3059,6 +3060,7 @@ type FetchTaggedRequest struct {
Limit *int64 `thrift:"limit,6" db:"limit" json:"limit,omitempty"`
RangeTimeType TimeType `thrift:"rangeTimeType,7" db:"rangeTimeType" json:"rangeTimeType,omitempty"`
RequireExhaustive bool `thrift:"requireExhaustive,8" db:"requireExhaustive" json:"requireExhaustive,omitempty"`
DocsLimit *int64 `thrift:"docsLimit,9" db:"docsLimit" json:"docsLimit,omitempty"`
}

func NewFetchTaggedRequest() *FetchTaggedRequest {
Expand Down Expand Up @@ -3107,6 +3109,15 @@ var FetchTaggedRequest_RequireExhaustive_DEFAULT bool = false
func (p *FetchTaggedRequest) GetRequireExhaustive() bool {
return p.RequireExhaustive
}

var FetchTaggedRequest_DocsLimit_DEFAULT int64

func (p *FetchTaggedRequest) GetDocsLimit() int64 {
if !p.IsSetDocsLimit() {
return FetchTaggedRequest_DocsLimit_DEFAULT
}
return *p.DocsLimit
}
func (p *FetchTaggedRequest) IsSetLimit() bool {
return p.Limit != nil
}
Expand All @@ -3119,6 +3130,10 @@ func (p *FetchTaggedRequest) IsSetRequireExhaustive() bool {
return p.RequireExhaustive != FetchTaggedRequest_RequireExhaustive_DEFAULT
}

func (p *FetchTaggedRequest) IsSetDocsLimit() bool {
return p.DocsLimit != nil
}

func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
Expand Down Expand Up @@ -3176,6 +3191,10 @@ func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error {
if err := p.ReadField8(iprot); err != nil {
return err
}
case 9:
if err := p.ReadField9(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
Expand Down Expand Up @@ -3279,6 +3298,15 @@ func (p *FetchTaggedRequest) ReadField8(iprot thrift.TProtocol) error {
return nil
}

func (p *FetchTaggedRequest) ReadField9(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 9: ", err)
} else {
p.DocsLimit = &v
}
return nil
}

func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("FetchTaggedRequest"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
Expand Down Expand Up @@ -3308,6 +3336,9 @@ func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error {
if err := p.writeField8(oprot); err != nil {
return err
}
if err := p.writeField9(oprot); err != nil {
return err
}
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
Expand Down Expand Up @@ -3428,6 +3459,21 @@ func (p *FetchTaggedRequest) writeField8(oprot thrift.TProtocol) (err error) {
return err
}

func (p *FetchTaggedRequest) writeField9(oprot thrift.TProtocol) (err error) {
if p.IsSetDocsLimit() {
if err := oprot.WriteFieldBegin("docsLimit", thrift.I64, 9); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 9:docsLimit: ", p), err)
}
if err := oprot.WriteI64(int64(*p.DocsLimit)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.docsLimit (9) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 9:docsLimit: ", p), err)
}
}
return err
}

func (p *FetchTaggedRequest) String() string {
if p == nil {
return "<nil>"
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/integration/fetch_tagged_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func makeTestFetchTagged(
index.QueryOptions{
StartInclusive: startTime.Add(-time.Minute),
EndExclusive: startTime.Add(time.Minute),
Limit: 100,
SeriesLimit: 100,
})
return series, metadata.Exhaustive, err
}
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/integration/index_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (w TestIndexWrites) NumIndexed(t *testing.T, ns ident.ID, s client.Session)
iter, _, err := s.FetchTaggedIDs(ns, index.Query{Query: q}, index.QueryOptions{
StartInclusive: wi.ts.Add(-1 * time.Second),
EndExclusive: wi.ts.Add(1 * time.Second),
Limit: 10})
SeriesLimit: 10})
if err != nil {
continue
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func isIndexed(t *testing.T, s client.Session, ns ident.ID, id ident.ID, tags id
iter, _, err := s.FetchTaggedIDs(ns, index.Query{Query: q}, index.QueryOptions{
StartInclusive: time.Now(),
EndExclusive: time.Now(),
Limit: 10})
SeriesLimit: 10})
if err != nil {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR
EndExclusive: end,
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}

session, err := s.session()
Expand Down
22 changes: 15 additions & 7 deletions src/dbnode/network/server/tchannelthrift/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ func FromRPCFetchTaggedRequest(
RequireExhaustive: req.RequireExhaustive,
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}
if l := req.DocsLimit; l != nil {
opts.DocsLimit = int(*l)
}

q, err := idx.Unmarshal(req.Query)
Expand Down Expand Up @@ -273,11 +276,16 @@ func ToRPCFetchTaggedRequest(
RequireExhaustive: opts.RequireExhaustive,
}

if opts.Limit > 0 {
l := int64(opts.Limit)
if opts.SeriesLimit > 0 {
l := int64(opts.SeriesLimit)
request.Limit = &l
}

if opts.DocsLimit > 0 {
l := int64(opts.DocsLimit)
request.DocsLimit = &l
}

return request, nil
}

Expand All @@ -302,7 +310,7 @@ func FromRPCAggregateQueryRequest(
},
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}

query, err := FromRPCQuery(req.Query)
Expand Down Expand Up @@ -347,7 +355,7 @@ func FromRPCAggregateQueryRawRequest(
},
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}

query, err := idx.Unmarshal(req.Query)
Expand Down Expand Up @@ -394,8 +402,8 @@ func ToRPCAggregateQueryRawRequest(
RangeEnd: rangeEnd,
}

if opts.Limit > 0 {
l := int64(opts.Limit)
if opts.SeriesLimit > 0 {
l := int64(opts.SeriesLimit)
request.Limit = &l
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestConvertFetchTaggedRequest(t *testing.T) {
opts := index.QueryOptions{
StartInclusive: time.Now().Add(-900 * time.Hour),
EndExclusive: time.Now(),
Limit: 10,
SeriesLimit: 10,
}
fetchData := true
var limit int64 = 10
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestConvertAggregateRawQueryRequest(t *testing.T) {
QueryOptions: index.QueryOptions{
StartInclusive: time.Now().Add(-900 * time.Hour),
EndExclusive: time.Now(),
Limit: 10,
SeriesLimit: 10,
},
Type: index.AggregateTagNamesAndValues,
FieldFilter: index.AggregateFieldFilter{
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query
EndExclusive: end,
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}
queryResult, err := db.QueryIDs(ctx, nsID, index.Query{Query: q}, opts)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestServiceQuery(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

limit := int64(10)
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestServiceQueryUnknownErr(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{}, unknownErr)

limit := int64(10)
Expand Down Expand Up @@ -1596,7 +1596,7 @@ func TestServiceFetchTagged(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
Expand Down Expand Up @@ -1792,7 +1792,7 @@ func TestServiceFetchTaggedNoData(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
Expand Down Expand Up @@ -1865,7 +1865,7 @@ func TestServiceFetchTaggedErrs(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{}, fmt.Errorf("random err"))
_, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{
NameSpace: []byte(nsID),
Expand Down Expand Up @@ -1915,7 +1915,7 @@ func TestServiceAggregate(t *testing.T) {
QueryOptions: index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
},
FieldFilter: index.AggregateFieldFilter{
[]byte("foo"), []byte("bar"),
Expand Down Expand Up @@ -1998,7 +1998,7 @@ func TestServiceAggregateNameOnly(t *testing.T) {
QueryOptions: index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
},
FieldFilter: index.AggregateFieldFilter{
[]byte("foo"), []byte("bar"),
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,8 @@ func (d *db) QueryIDs(
sp.LogFields(
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", namespace.String()),
opentracinglog.Int("limit", opts.Limit),
opentracinglog.Int("seriesLimit", opts.SeriesLimit),
opentracinglog.Int("docsLimit", opts.DocsLimit),
xopentracing.Time("start", opts.StartInclusive),
xopentracing.Time("end", opts.EndExclusive),
)
Expand Down Expand Up @@ -795,7 +796,8 @@ func (d *db) AggregateQuery(
sp.LogFields(
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", namespace.String()),
opentracinglog.Int("limit", aggResultOpts.QueryOptions.Limit),
opentracinglog.Int("seriesLimit", aggResultOpts.QueryOptions.SeriesLimit),
opentracinglog.Int("docsLimit", aggResultOpts.QueryOptions.DocsLimit),
xopentracing.Time("start", aggResultOpts.QueryOptions.StartInclusive),
xopentracing.Time("end", aggResultOpts.QueryOptions.EndExclusive),
)
Expand Down
Loading

0 comments on commit 6f05354

Please sign in to comment.