Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode/query] Limit docs per query #2415

Merged
merged 43 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c76d60f
doc limit enforcement 1
rallen090 Jun 16, 2020
30a39f7
doc limit enforcement 2
rallen090 Jun 16, 2020
20ea1f8
doc limit enforcement 3
rallen090 Jun 16, 2020
7ab9871
doc limit enforcement 4
rallen090 Jun 16, 2020
9a83aad
doc limit enforcement 5
rallen090 Jun 16, 2020
cfa5ce6
doc limit enforcement 6
rallen090 Jun 16, 2020
6e67cca
doc limit enforcement 7
rallen090 Jun 16, 2020
e6e21ea
doc limit enforcement 8
rallen090 Jun 16, 2020
931d708
doc limit enforcement 9
rallen090 Jun 16, 2020
c69ecc8
doc limit enforcement 10
rallen090 Jun 16, 2020
3f40c43
test cases 1
rallen090 Jun 16, 2020
fd40408
test cases 2
rallen090 Jun 16, 2020
66e42a7
test cases 3
rallen090 Jun 16, 2020
c8b0c31
feedback 1
rallen090 Jun 17, 2020
0a3fc89
feedback 2
rallen090 Jun 17, 2020
fdf37fc
Merge remote-tracking branch 'origin/master' into ra/limit-per-query-…
rallen090 Jun 17, 2020
dad0ac6
test fixes 1
rallen090 Jun 17, 2020
5750876
added back rpc mock
rallen090 Jun 17, 2020
63e670c
added back rpc mock
rallen090 Jun 17, 2020
739d767
Merge branch 'master' into ra/limit-per-query-blocks
rallen090 Jun 22, 2020
05ef04f
field names to test cases
rallen090 Jun 22, 2020
d33111d
redundant arg type
rallen090 Jun 22, 2020
e61695f
return docsCount along with size when adding documents
rallen090 Jun 22, 2020
30b9bb2
return docsCount along with size when adding documents 2
rallen090 Jun 22, 2020
0539c5c
return docsCount along with size when adding documents 3
rallen090 Jun 22, 2020
1fec340
track docs count in agg query path
rallen090 Jun 22, 2020
a124ca8
fix doubling counting exhaustive err metric
rallen090 Jun 22, 2020
8647dcb
gen
rallen090 Jun 22, 2020
958fcbd
gen 2
rallen090 Jun 22, 2020
aa97398
fix series check OR condition
rallen090 Jun 22, 2020
d4a6690
missing places for changing docs and series limits
rallen090 Jun 22, 2020
7bdc86d
missing places for changing docs and series limits 2
rallen090 Jun 22, 2020
cd59d74
missing places for changing docs and series limits 3
rallen090 Jun 22, 2020
fd93c86
missing places for changing docs and series limits 4
rallen090 Jun 22, 2020
3115d0a
missing places for changing docs and series limits 5
rallen090 Jun 22, 2020
3ff1997
missing places for changing docs and series limits 6
rallen090 Jun 22, 2020
d9c3d04
misc test/build fixes 1
rallen090 Jun 22, 2020
15726a5
allow doclimit query param
rallen090 Jun 22, 2020
eb39ac9
allow doclimit query param 2
rallen090 Jun 22, 2020
3a35c82
gen fix
rallen090 Jun 22, 2020
529dc3f
Merge remote-tracking branch 'origin/master' into ra/limit-per-query-…
rallen090 Jun 23, 2020
bd8f954
remove prints
rallen090 Jun 23, 2020
451828a
test case for both limits
rallen090 Jun 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ var (

defaultCarbonIngesterAggregationType = aggregation.Mean

defaultStorageQueryLimit = 10000
defaultStorageQuerySeriesLimit = 10000
defaultStorageQueryDocsLimit = 0 // Default OFF.
)

// Configuration is the configuration for the query service.
Expand Down Expand Up @@ -308,6 +309,9 @@ type PerQueryLimitsConfiguration struct {
// MaxFetchedSeries limits the number of time series returned by a storage node.
MaxFetchedSeries int `yaml:"maxFetchedSeries"`

// MaxFetchedDocs limits the number of index documents matched by a query.
MaxFetchedDocs int `yaml:"maxFetchedDocs"`

// RequireExhaustive results in an error if the query exceeds the series limit.
RequireExhaustive bool `yaml:"requireExhaustive"`
}
Expand All @@ -323,13 +327,15 @@ func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerO
func (l *PerQueryLimitsConfiguration) AsFetchOptionsBuilderOptions() handleroptions.FetchOptionsBuilderOptions {
if l.MaxFetchedSeries <= 0 {
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
return handleroptions.FetchOptionsBuilderOptions{
Limit: defaultStorageQueryLimit,
SeriesLimit: defaultStorageQuerySeriesLimit,
DocsLimit: defaultStorageQueryDocsLimit,
RequireExhaustive: l.RequireExhaustive,
}
}

return handleroptions.FetchOptionsBuilderOptions{
Limit: int(l.MaxFetchedSeries),
SeriesLimit: int(l.MaxFetchedSeries),
DocsLimit: int(l.MaxFetchedDocs),
RequireExhaustive: l.RequireExhaustive,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ struct FetchTaggedRequest {
6: optional i64 limit
7: optional TimeType rangeTimeType = TimeType.UNIX_SECONDS
8: optional bool requireExhaustive = false
9: optional i64 docsLimit
}

struct FetchTaggedResult {
Expand Down
46 changes: 46 additions & 0 deletions src/dbnode/generated/thrift/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,6 +3050,7 @@ func (p *Segment) String() string {
// - Limit
// - RangeTimeType
// - RequireExhaustive
// - DocsLimit
type FetchTaggedRequest struct {
NameSpace []byte `thrift:"nameSpace,1,required" db:"nameSpace" json:"nameSpace"`
Query []byte `thrift:"query,2,required" db:"query" json:"query"`
Expand All @@ -3059,6 +3060,7 @@ type FetchTaggedRequest struct {
Limit *int64 `thrift:"limit,6" db:"limit" json:"limit,omitempty"`
RangeTimeType TimeType `thrift:"rangeTimeType,7" db:"rangeTimeType" json:"rangeTimeType,omitempty"`
RequireExhaustive bool `thrift:"requireExhaustive,8" db:"requireExhaustive" json:"requireExhaustive,omitempty"`
DocsLimit *int64 `thrift:"docsLimit,9" db:"docsLimit" json:"docsLimit,omitempty"`
}

func NewFetchTaggedRequest() *FetchTaggedRequest {
Expand Down Expand Up @@ -3107,6 +3109,15 @@ var FetchTaggedRequest_RequireExhaustive_DEFAULT bool = false
func (p *FetchTaggedRequest) GetRequireExhaustive() bool {
return p.RequireExhaustive
}

var FetchTaggedRequest_DocsLimit_DEFAULT int64

func (p *FetchTaggedRequest) GetDocsLimit() int64 {
if !p.IsSetDocsLimit() {
return FetchTaggedRequest_DocsLimit_DEFAULT
}
return *p.DocsLimit
}
func (p *FetchTaggedRequest) IsSetLimit() bool {
return p.Limit != nil
}
Expand All @@ -3119,6 +3130,10 @@ func (p *FetchTaggedRequest) IsSetRequireExhaustive() bool {
return p.RequireExhaustive != FetchTaggedRequest_RequireExhaustive_DEFAULT
}

func (p *FetchTaggedRequest) IsSetDocsLimit() bool {
return p.DocsLimit != nil
}

func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
Expand Down Expand Up @@ -3176,6 +3191,10 @@ func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error {
if err := p.ReadField8(iprot); err != nil {
return err
}
case 9:
if err := p.ReadField9(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
Expand Down Expand Up @@ -3279,6 +3298,15 @@ func (p *FetchTaggedRequest) ReadField8(iprot thrift.TProtocol) error {
return nil
}

func (p *FetchTaggedRequest) ReadField9(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 9: ", err)
} else {
p.DocsLimit = &v
}
return nil
}

func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("FetchTaggedRequest"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
Expand Down Expand Up @@ -3308,6 +3336,9 @@ func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error {
if err := p.writeField8(oprot); err != nil {
return err
}
if err := p.writeField9(oprot); err != nil {
return err
}
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
Expand Down Expand Up @@ -3428,6 +3459,21 @@ func (p *FetchTaggedRequest) writeField8(oprot thrift.TProtocol) (err error) {
return err
}

func (p *FetchTaggedRequest) writeField9(oprot thrift.TProtocol) (err error) {
if p.IsSetDocsLimit() {
if err := oprot.WriteFieldBegin("docsLimit", thrift.I64, 9); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 9:docsLimit: ", p), err)
}
if err := oprot.WriteI64(int64(*p.DocsLimit)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.docsLimit (9) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 9:docsLimit: ", p), err)
}
}
return err
}

func (p *FetchTaggedRequest) String() string {
if p == nil {
return "<nil>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR
EndExclusive: end,
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}

session, err := s.session()
Expand Down
22 changes: 15 additions & 7 deletions src/dbnode/network/server/tchannelthrift/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ func FromRPCFetchTaggedRequest(
RequireExhaustive: req.RequireExhaustive,
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}
if l := req.DocsLimit; l != nil {
opts.DocsLimit = int(*l)
}

q, err := idx.Unmarshal(req.Query)
Expand Down Expand Up @@ -273,11 +276,16 @@ func ToRPCFetchTaggedRequest(
RequireExhaustive: opts.RequireExhaustive,
}

if opts.Limit > 0 {
l := int64(opts.Limit)
if opts.SeriesLimit > 0 {
l := int64(opts.SeriesLimit)
request.Limit = &l
}

if opts.DocsLimit > 0 {
l := int64(opts.DocsLimit)
request.DocsLimit = &l
}

return request, nil
}

Expand All @@ -302,7 +310,7 @@ func FromRPCAggregateQueryRequest(
},
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}

query, err := FromRPCQuery(req.Query)
Expand Down Expand Up @@ -347,7 +355,7 @@ func FromRPCAggregateQueryRawRequest(
},
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}

query, err := idx.Unmarshal(req.Query)
Expand Down Expand Up @@ -394,8 +402,8 @@ func ToRPCAggregateQueryRawRequest(
RangeEnd: rangeEnd,
}

if opts.Limit > 0 {
l := int64(opts.Limit)
if opts.SeriesLimit > 0 {
l := int64(opts.SeriesLimit)
request.Limit = &l
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestConvertFetchTaggedRequest(t *testing.T) {
opts := index.QueryOptions{
StartInclusive: time.Now().Add(-900 * time.Hour),
EndExclusive: time.Now(),
Limit: 10,
SeriesLimit: 10,
}
fetchData := true
var limit int64 = 10
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestConvertAggregateRawQueryRequest(t *testing.T) {
QueryOptions: index.QueryOptions{
StartInclusive: time.Now().Add(-900 * time.Hour),
EndExclusive: time.Now(),
Limit: 10,
SeriesLimit: 10,
},
Type: index.AggregateTagNamesAndValues,
FieldFilter: index.AggregateFieldFilter{
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query
EndExclusive: end,
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
opts.SeriesLimit = int(*l)
}
queryResult, err := db.QueryIDs(ctx, nsID, index.Query{Query: q}, opts)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestServiceQuery(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

limit := int64(10)
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestServiceQueryUnknownErr(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{}, unknownErr)

limit := int64(10)
Expand Down Expand Up @@ -1596,7 +1596,7 @@ func TestServiceFetchTagged(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
Expand Down Expand Up @@ -1792,7 +1792,7 @@ func TestServiceFetchTaggedNoData(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
Expand Down Expand Up @@ -1865,7 +1865,7 @@ func TestServiceFetchTaggedErrs(t *testing.T) {
index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
}).Return(index.QueryResult{}, fmt.Errorf("random err"))
_, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{
NameSpace: []byte(nsID),
Expand Down Expand Up @@ -1915,7 +1915,7 @@ func TestServiceAggregate(t *testing.T) {
QueryOptions: index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
},
FieldFilter: index.AggregateFieldFilter{
[]byte("foo"), []byte("bar"),
Expand Down Expand Up @@ -1998,7 +1998,7 @@ func TestServiceAggregateNameOnly(t *testing.T) {
QueryOptions: index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
Limit: 10,
SeriesLimit: 10,
},
FieldFilter: index.AggregateFieldFilter{
[]byte("foo"), []byte("bar"),
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,8 @@ func (d *db) QueryIDs(
sp.LogFields(
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", namespace.String()),
opentracinglog.Int("limit", opts.Limit),
opentracinglog.Int("seriesLimit", opts.SeriesLimit),
opentracinglog.Int("docsLimit", opts.DocsLimit),
xopentracing.Time("start", opts.StartInclusive),
xopentracing.Time("end", opts.EndExclusive),
)
Expand Down Expand Up @@ -795,7 +796,8 @@ func (d *db) AggregateQuery(
sp.LogFields(
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", namespace.String()),
opentracinglog.Int("limit", aggResultOpts.QueryOptions.Limit),
opentracinglog.Int("seriesLimit", aggResultOpts.QueryOptions.SeriesLimit),
opentracinglog.Int("docsLimit", aggResultOpts.QueryOptions.DocsLimit),
xopentracing.Time("start", aggResultOpts.QueryOptions.StartInclusive),
xopentracing.Time("end", aggResultOpts.QueryOptions.EndExclusive),
)
Expand Down
Loading