From af43b8c902a8baa4ebc7246abad3269bc876dead Mon Sep 17 00:00:00 2001 From: "artem@chronosphere.io" Date: Tue, 17 Nov 2020 09:34:31 -0500 Subject: [PATCH] Hopefully git issues resolved? --- src/dbnode/generated/thrift/rpc.thrift | 8 + src/dbnode/generated/thrift/rpc/rpc.go | 344 ++++++++++++++++++ .../server/tchannelthrift/cluster/service.go | 4 + .../server/tchannelthrift/convert/convert.go | 18 +- .../server/tchannelthrift/node/service.go | 34 +- src/dbnode/persist/fs/retriever.go | 11 +- src/dbnode/server/options.go | 8 +- src/dbnode/server/server.go | 78 ++-- src/dbnode/storage/index/block.go | 16 +- src/dbnode/storage/index/types.go | 2 + .../storage/limits/noop_query_limits.go | 2 +- src/dbnode/storage/limits/options.go | 107 ++++++ src/dbnode/storage/limits/query_limits.go | 52 ++- .../storage/limits/query_limits_test.go | 91 ++++- src/dbnode/storage/limits/source_logger.go | 72 ++++ .../storage/limits/source_logger_test.go | 92 +++++ src/dbnode/storage/limits/types.go | 53 ++- src/dbnode/storage/options.go | 23 ++ src/dbnode/storage/storage_mock.go | 57 +++ src/dbnode/storage/types.go | 16 + src/query/api/v1/handler/graphite/render.go | 4 + .../handleroptions/fetch_options.go | 4 + src/query/generated/proto/rpcpb/query.pb.go | 263 +++++++------ src/query/generated/proto/rpcpb/query.proto | 1 + src/query/graphite/common/context.go | 3 + src/query/graphite/native/expression.go | 1 + src/query/graphite/storage/m3_wrapper.go | 1 + src/query/graphite/storage/storage.go | 2 + src/query/remote/codecs.go | 4 +- src/query/storage/index.go | 2 + src/query/storage/types.go | 2 + src/x/headers/headers.go | 3 + 32 files changed, 1187 insertions(+), 191 deletions(-) create mode 100644 src/dbnode/storage/limits/options.go create mode 100644 src/dbnode/storage/limits/source_logger.go create mode 100644 src/dbnode/storage/limits/source_logger_test.go diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index 4180b2ec90..112405c525 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -93,6 +93,7 @@ struct FetchRequest { 4: required string id 5: optional TimeType rangeType = TimeType.UNIX_SECONDS 6: optional TimeType resultTimeType = TimeType.UNIX_SECONDS + 7: optional binary source } struct FetchResult { @@ -125,12 +126,14 @@ struct FetchBatchRawRequest { 3: required binary nameSpace 4: required list ids 5: optional TimeType rangeTimeType = TimeType.UNIX_SECONDS + 6: optional binary source } struct FetchBatchRawV2Request { 1: required list nameSpaces 2: required list elements + 3: optional binary source } struct FetchBatchRawV2RequestElement { @@ -173,6 +176,7 @@ struct FetchTaggedRequest { 7: optional TimeType rangeTimeType = TimeType.UNIX_SECONDS 8: optional bool requireExhaustive = true 9: optional i64 docsLimit + 10: optional binary source } struct FetchTaggedResult { @@ -194,6 +198,7 @@ struct FetchBlocksRawRequest { 1: required binary nameSpace 2: required i32 shard 3: required list elements + 4: optional binary source } struct FetchBlocksRawRequestElement { @@ -391,6 +396,7 @@ struct AggregateQueryRawRequest { 6: optional list tagNameFilter 7: optional AggregateQueryType aggregateQueryType = AggregateQueryType.AGGREGATE_BY_TAG_NAME_VALUE 8: optional TimeType rangeType = TimeType.UNIX_SECONDS + 9: optional binary source } struct AggregateQueryRawResult { @@ -417,6 +423,7 @@ struct AggregateQueryRequest { 6: optional list tagNameFilter 7: optional AggregateQueryType aggregateQueryType = AggregateQueryType.AGGREGATE_BY_TAG_NAME_VALUE 8: optional TimeType rangeType = TimeType.UNIX_SECONDS + 9: optional binary source } struct AggregateQueryResult { @@ -443,6 +450,7 @@ struct QueryRequest { 6: optional bool noData 7: optional TimeType rangeType = TimeType.UNIX_SECONDS 8: optional TimeType resultTimeType = TimeType.UNIX_SECONDS + 9: optional binary source } struct QueryResult { diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index b658370a1d..52d5696af3 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -495,6 +495,7 @@ func (p *WriteBatchRawErrors) Error() string { // - ID // - RangeType // - ResultTimeType +// - Source type FetchRequest struct { RangeStart int64 `thrift:"rangeStart,1,required" db:"rangeStart" json:"rangeStart"` RangeEnd int64 `thrift:"rangeEnd,2,required" db:"rangeEnd" json:"rangeEnd"` @@ -502,6 +503,7 @@ type FetchRequest struct { ID string `thrift:"id,4,required" db:"id" json:"id"` RangeType TimeType `thrift:"rangeType,5" db:"rangeType" json:"rangeType,omitempty"` ResultTimeType TimeType `thrift:"resultTimeType,6" db:"resultTimeType" json:"resultTimeType,omitempty"` + Source []byte `thrift:"source,7" db:"source" json:"source,omitempty"` } func NewFetchRequest() *FetchRequest { @@ -539,6 +541,12 @@ var FetchRequest_ResultTimeType_DEFAULT TimeType = 0 func (p *FetchRequest) GetResultTimeType() TimeType { return p.ResultTimeType } + +var FetchRequest_Source_DEFAULT []byte + +func (p *FetchRequest) GetSource() []byte { + return p.Source +} func (p *FetchRequest) IsSetRangeType() bool { return p.RangeType != FetchRequest_RangeType_DEFAULT } @@ -547,6 +555,10 @@ func (p *FetchRequest) IsSetResultTimeType() bool { return p.ResultTimeType != FetchRequest_ResultTimeType_DEFAULT } +func (p *FetchRequest) IsSetSource() bool { + return p.Source != nil +} + func (p *FetchRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -594,6 +606,10 @@ func (p *FetchRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField6(iprot); err != nil { return err } + case 7: + if err := p.ReadField7(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -677,6 +693,15 @@ func (p *FetchRequest) ReadField6(iprot thrift.TProtocol) error { return nil } +func (p *FetchRequest) ReadField7(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 7: ", err) + } else { + p.Source = v + } + return nil +} + func (p *FetchRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("FetchRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -700,6 +725,9 @@ func (p *FetchRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField6(oprot); err != nil { return err } + if err := p.writeField7(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -792,6 +820,21 @@ func (p *FetchRequest) writeField6(oprot thrift.TProtocol) (err error) { return err } +func (p *FetchRequest) writeField7(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 7); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 7:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (7) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 7:source: ", p), err) + } + } + return err +} + func (p *FetchRequest) String() string { if p == nil { return "" @@ -1590,12 +1633,14 @@ func (p *WriteTaggedRequest) String() string { // - NameSpace // - Ids // - RangeTimeType +// - Source type FetchBatchRawRequest struct { RangeStart int64 `thrift:"rangeStart,1,required" db:"rangeStart" json:"rangeStart"` RangeEnd int64 `thrift:"rangeEnd,2,required" db:"rangeEnd" json:"rangeEnd"` NameSpace []byte `thrift:"nameSpace,3,required" db:"nameSpace" json:"nameSpace"` Ids [][]byte `thrift:"ids,4,required" db:"ids" json:"ids"` RangeTimeType TimeType `thrift:"rangeTimeType,5" db:"rangeTimeType" json:"rangeTimeType,omitempty"` + Source []byte `thrift:"source,6" db:"source" json:"source,omitempty"` } func NewFetchBatchRawRequest() *FetchBatchRawRequest { @@ -1625,10 +1670,20 @@ var FetchBatchRawRequest_RangeTimeType_DEFAULT TimeType = 0 func (p *FetchBatchRawRequest) GetRangeTimeType() TimeType { return p.RangeTimeType } + +var FetchBatchRawRequest_Source_DEFAULT []byte + +func (p *FetchBatchRawRequest) GetSource() []byte { + return p.Source +} func (p *FetchBatchRawRequest) IsSetRangeTimeType() bool { return p.RangeTimeType != FetchBatchRawRequest_RangeTimeType_DEFAULT } +func (p *FetchBatchRawRequest) IsSetSource() bool { + return p.Source != nil +} + func (p *FetchBatchRawRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -1672,6 +1727,10 @@ func (p *FetchBatchRawRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField5(iprot); err != nil { return err } + case 6: + if err := p.ReadField6(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -1758,6 +1817,15 @@ func (p *FetchBatchRawRequest) ReadField5(iprot thrift.TProtocol) error { return nil } +func (p *FetchBatchRawRequest) ReadField6(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 6: ", err) + } else { + p.Source = v + } + return nil +} + func (p *FetchBatchRawRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("FetchBatchRawRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -1778,6 +1846,9 @@ func (p *FetchBatchRawRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField5(oprot); err != nil { return err } + if err := p.writeField6(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -1863,6 +1934,21 @@ func (p *FetchBatchRawRequest) writeField5(oprot thrift.TProtocol) (err error) { return err } +func (p *FetchBatchRawRequest) writeField6(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 6); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (6) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 6:source: ", p), err) + } + } + return err +} + func (p *FetchBatchRawRequest) String() string { if p == nil { return "" @@ -1873,9 +1959,11 @@ func (p *FetchBatchRawRequest) String() string { // Attributes: // - NameSpaces // - Elements +// - Source type FetchBatchRawV2Request struct { NameSpaces [][]byte `thrift:"nameSpaces,1,required" db:"nameSpaces" json:"nameSpaces"` Elements []*FetchBatchRawV2RequestElement `thrift:"elements,2,required" db:"elements" json:"elements"` + Source []byte `thrift:"source,3" db:"source" json:"source,omitempty"` } func NewFetchBatchRawV2Request() *FetchBatchRawV2Request { @@ -1889,6 +1977,16 @@ func (p *FetchBatchRawV2Request) GetNameSpaces() [][]byte { func (p *FetchBatchRawV2Request) GetElements() []*FetchBatchRawV2RequestElement { return p.Elements } + +var FetchBatchRawV2Request_Source_DEFAULT []byte + +func (p *FetchBatchRawV2Request) GetSource() []byte { + return p.Source +} +func (p *FetchBatchRawV2Request) IsSetSource() bool { + return p.Source != nil +} + func (p *FetchBatchRawV2Request) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -1916,6 +2014,10 @@ func (p *FetchBatchRawV2Request) Read(iprot thrift.TProtocol) error { return err } issetElements = true + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -1981,6 +2083,15 @@ func (p *FetchBatchRawV2Request) ReadField2(iprot thrift.TProtocol) error { return nil } +func (p *FetchBatchRawV2Request) ReadField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.Source = v + } + return nil +} + func (p *FetchBatchRawV2Request) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("FetchBatchRawV2Request"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -1992,6 +2103,9 @@ func (p *FetchBatchRawV2Request) Write(oprot thrift.TProtocol) error { if err := p.writeField2(oprot); err != nil { return err } + if err := p.writeField3(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -2044,6 +2158,21 @@ func (p *FetchBatchRawV2Request) writeField2(oprot thrift.TProtocol) (err error) return err } +func (p *FetchBatchRawV2Request) writeField3(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:source: ", p), err) + } + } + return err +} + func (p *FetchBatchRawV2Request) String() string { if p == nil { return "" @@ -3051,6 +3180,7 @@ func (p *Segment) String() string { // - RangeTimeType // - RequireExhaustive // - DocsLimit +// - Source type FetchTaggedRequest struct { NameSpace []byte `thrift:"nameSpace,1,required" db:"nameSpace" json:"nameSpace"` Query []byte `thrift:"query,2,required" db:"query" json:"query"` @@ -3061,6 +3191,7 @@ type FetchTaggedRequest struct { RangeTimeType TimeType `thrift:"rangeTimeType,7" db:"rangeTimeType" json:"rangeTimeType,omitempty"` RequireExhaustive bool `thrift:"requireExhaustive,8" db:"requireExhaustive" json:"requireExhaustive,omitempty"` DocsLimit *int64 `thrift:"docsLimit,9" db:"docsLimit" json:"docsLimit,omitempty"` + Source []byte `thrift:"source,10" db:"source" json:"source,omitempty"` } func NewFetchTaggedRequest() *FetchTaggedRequest { @@ -3120,6 +3251,12 @@ func (p *FetchTaggedRequest) GetDocsLimit() int64 { } return *p.DocsLimit } + +var FetchTaggedRequest_Source_DEFAULT []byte + +func (p *FetchTaggedRequest) GetSource() []byte { + return p.Source +} func (p *FetchTaggedRequest) IsSetLimit() bool { return p.Limit != nil } @@ -3136,6 +3273,10 @@ func (p *FetchTaggedRequest) IsSetDocsLimit() bool { return p.DocsLimit != nil } +func (p *FetchTaggedRequest) IsSetSource() bool { + return p.Source != nil +} + func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -3197,6 +3338,10 @@ func (p *FetchTaggedRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField9(iprot); err != nil { return err } + case 10: + if err := p.ReadField10(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -3309,6 +3454,15 @@ func (p *FetchTaggedRequest) ReadField9(iprot thrift.TProtocol) error { return nil } +func (p *FetchTaggedRequest) ReadField10(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 10: ", err) + } else { + p.Source = v + } + return nil +} + func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("FetchTaggedRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -3341,6 +3495,9 @@ func (p *FetchTaggedRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField9(oprot); err != nil { return err } + if err := p.writeField10(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -3476,6 +3633,21 @@ func (p *FetchTaggedRequest) writeField9(oprot thrift.TProtocol) (err error) { return err } +func (p *FetchTaggedRequest) writeField10(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 10); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 10:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (10) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 10:source: ", p), err) + } + } + return err +} + func (p *FetchTaggedRequest) String() string { if p == nil { return "" @@ -3933,10 +4105,12 @@ func (p *FetchTaggedIDResult_) String() string { // - NameSpace // - Shard // - Elements +// - Source type FetchBlocksRawRequest struct { NameSpace []byte `thrift:"nameSpace,1,required" db:"nameSpace" json:"nameSpace"` Shard int32 `thrift:"shard,2,required" db:"shard" json:"shard"` Elements []*FetchBlocksRawRequestElement `thrift:"elements,3,required" db:"elements" json:"elements"` + Source []byte `thrift:"source,4" db:"source" json:"source,omitempty"` } func NewFetchBlocksRawRequest() *FetchBlocksRawRequest { @@ -3954,6 +4128,16 @@ func (p *FetchBlocksRawRequest) GetShard() int32 { func (p *FetchBlocksRawRequest) GetElements() []*FetchBlocksRawRequestElement { return p.Elements } + +var FetchBlocksRawRequest_Source_DEFAULT []byte + +func (p *FetchBlocksRawRequest) GetSource() []byte { + return p.Source +} +func (p *FetchBlocksRawRequest) IsSetSource() bool { + return p.Source != nil +} + func (p *FetchBlocksRawRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -3987,6 +4171,10 @@ func (p *FetchBlocksRawRequest) Read(iprot thrift.TProtocol) error { return err } issetElements = true + case 4: + if err := p.ReadField4(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -4049,6 +4237,15 @@ func (p *FetchBlocksRawRequest) ReadField3(iprot thrift.TProtocol) error { return nil } +func (p *FetchBlocksRawRequest) ReadField4(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 4: ", err) + } else { + p.Source = v + } + return nil +} + func (p *FetchBlocksRawRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("FetchBlocksRawRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -4063,6 +4260,9 @@ func (p *FetchBlocksRawRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField3(oprot); err != nil { return err } + if err := p.writeField4(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -4120,6 +4320,21 @@ func (p *FetchBlocksRawRequest) writeField3(oprot thrift.TProtocol) (err error) return err } +func (p *FetchBlocksRawRequest) writeField4(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 4); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (4) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 4:source: ", p), err) + } + } + return err +} + func (p *FetchBlocksRawRequest) String() string { if p == nil { return "" @@ -9298,6 +9513,7 @@ func (p *HealthResult_) String() string { // - TagNameFilter // - AggregateQueryType // - RangeType +// - Source type AggregateQueryRawRequest struct { Query []byte `thrift:"query,1,required" db:"query" json:"query"` RangeStart int64 `thrift:"rangeStart,2,required" db:"rangeStart" json:"rangeStart"` @@ -9307,6 +9523,7 @@ type AggregateQueryRawRequest struct { TagNameFilter [][]byte `thrift:"tagNameFilter,6" db:"tagNameFilter" json:"tagNameFilter,omitempty"` AggregateQueryType AggregateQueryType `thrift:"aggregateQueryType,7" db:"aggregateQueryType" json:"aggregateQueryType,omitempty"` RangeType TimeType `thrift:"rangeType,8" db:"rangeType" json:"rangeType,omitempty"` + Source []byte `thrift:"source,9" db:"source" json:"source,omitempty"` } func NewAggregateQueryRawRequest() *AggregateQueryRawRequest { @@ -9359,6 +9576,12 @@ var AggregateQueryRawRequest_RangeType_DEFAULT TimeType = 0 func (p *AggregateQueryRawRequest) GetRangeType() TimeType { return p.RangeType } + +var AggregateQueryRawRequest_Source_DEFAULT []byte + +func (p *AggregateQueryRawRequest) GetSource() []byte { + return p.Source +} func (p *AggregateQueryRawRequest) IsSetLimit() bool { return p.Limit != nil } @@ -9375,6 +9598,10 @@ func (p *AggregateQueryRawRequest) IsSetRangeType() bool { return p.RangeType != AggregateQueryRawRequest_RangeType_DEFAULT } +func (p *AggregateQueryRawRequest) IsSetSource() bool { + return p.Source != nil +} + func (p *AggregateQueryRawRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -9430,6 +9657,10 @@ func (p *AggregateQueryRawRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField8(iprot); err != nil { return err } + case 9: + if err := p.ReadField9(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -9544,6 +9775,15 @@ func (p *AggregateQueryRawRequest) ReadField8(iprot thrift.TProtocol) error { return nil } +func (p *AggregateQueryRawRequest) ReadField9(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 9: ", err) + } else { + p.Source = v + } + return nil +} + func (p *AggregateQueryRawRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("AggregateQueryRawRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -9573,6 +9813,9 @@ func (p *AggregateQueryRawRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField8(oprot); err != nil { return err } + if err := p.writeField9(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -9703,6 +9946,21 @@ func (p *AggregateQueryRawRequest) writeField8(oprot thrift.TProtocol) (err erro return err } +func (p *AggregateQueryRawRequest) writeField9(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 9); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 9:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (9) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 9:source: ", p), err) + } + } + return err +} + func (p *AggregateQueryRawRequest) String() string { if p == nil { return "" @@ -10137,6 +10395,7 @@ func (p *AggregateQueryRawResultTagValueElement) String() string { // - TagNameFilter // - AggregateQueryType // - RangeType +// - Source type AggregateQueryRequest struct { Query *Query `thrift:"query,1" db:"query" json:"query,omitempty"` RangeStart int64 `thrift:"rangeStart,2,required" db:"rangeStart" json:"rangeStart"` @@ -10146,6 +10405,7 @@ type AggregateQueryRequest struct { TagNameFilter []string `thrift:"tagNameFilter,6" db:"tagNameFilter" json:"tagNameFilter,omitempty"` AggregateQueryType AggregateQueryType `thrift:"aggregateQueryType,7" db:"aggregateQueryType" json:"aggregateQueryType,omitempty"` RangeType TimeType `thrift:"rangeType,8" db:"rangeType" json:"rangeType,omitempty"` + Source []byte `thrift:"source,9" db:"source" json:"source,omitempty"` } func NewAggregateQueryRequest() *AggregateQueryRequest { @@ -10203,6 +10463,12 @@ var AggregateQueryRequest_RangeType_DEFAULT TimeType = 0 func (p *AggregateQueryRequest) GetRangeType() TimeType { return p.RangeType } + +var AggregateQueryRequest_Source_DEFAULT []byte + +func (p *AggregateQueryRequest) GetSource() []byte { + return p.Source +} func (p *AggregateQueryRequest) IsSetQuery() bool { return p.Query != nil } @@ -10223,6 +10489,10 @@ func (p *AggregateQueryRequest) IsSetRangeType() bool { return p.RangeType != AggregateQueryRequest_RangeType_DEFAULT } +func (p *AggregateQueryRequest) IsSetSource() bool { + return p.Source != nil +} + func (p *AggregateQueryRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -10276,6 +10546,10 @@ func (p *AggregateQueryRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField8(iprot); err != nil { return err } + case 9: + if err := p.ReadField9(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -10386,6 +10660,15 @@ func (p *AggregateQueryRequest) ReadField8(iprot thrift.TProtocol) error { return nil } +func (p *AggregateQueryRequest) ReadField9(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 9: ", err) + } else { + p.Source = v + } + return nil +} + func (p *AggregateQueryRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("AggregateQueryRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -10415,6 +10698,9 @@ func (p *AggregateQueryRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField8(oprot); err != nil { return err } + if err := p.writeField9(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -10547,6 +10833,21 @@ func (p *AggregateQueryRequest) writeField8(oprot thrift.TProtocol) (err error) return err } +func (p *AggregateQueryRequest) writeField9(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 9); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 9:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (9) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 9:source: ", p), err) + } + } + return err +} + func (p *AggregateQueryRequest) String() string { if p == nil { return "" @@ -10981,6 +11282,7 @@ func (p *AggregateQueryResultTagValueElement) String() string { // - NoData // - RangeType // - ResultTimeType +// - Source type QueryRequest struct { Query *Query `thrift:"query,1,required" db:"query" json:"query"` RangeStart int64 `thrift:"rangeStart,2,required" db:"rangeStart" json:"rangeStart"` @@ -10990,6 +11292,7 @@ type QueryRequest struct { NoData *bool `thrift:"noData,6" db:"noData" json:"noData,omitempty"` RangeType TimeType `thrift:"rangeType,7" db:"rangeType" json:"rangeType,omitempty"` ResultTimeType TimeType `thrift:"resultTimeType,8" db:"resultTimeType" json:"resultTimeType,omitempty"` + Source []byte `thrift:"source,9" db:"source" json:"source,omitempty"` } func NewQueryRequest() *QueryRequest { @@ -11050,6 +11353,12 @@ var QueryRequest_ResultTimeType_DEFAULT TimeType = 0 func (p *QueryRequest) GetResultTimeType() TimeType { return p.ResultTimeType } + +var QueryRequest_Source_DEFAULT []byte + +func (p *QueryRequest) GetSource() []byte { + return p.Source +} func (p *QueryRequest) IsSetQuery() bool { return p.Query != nil } @@ -11070,6 +11379,10 @@ func (p *QueryRequest) IsSetResultTimeType() bool { return p.ResultTimeType != QueryRequest_ResultTimeType_DEFAULT } +func (p *QueryRequest) IsSetSource() bool { + return p.Source != nil +} + func (p *QueryRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -11125,6 +11438,10 @@ func (p *QueryRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField8(iprot); err != nil { return err } + case 9: + if err := p.ReadField9(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -11225,6 +11542,15 @@ func (p *QueryRequest) ReadField8(iprot thrift.TProtocol) error { return nil } +func (p *QueryRequest) ReadField9(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBinary(); err != nil { + return thrift.PrependError("error reading field 9: ", err) + } else { + p.Source = v + } + return nil +} + func (p *QueryRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("QueryRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -11254,6 +11580,9 @@ func (p *QueryRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField8(oprot); err != nil { return err } + if err := p.writeField9(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -11376,6 +11705,21 @@ func (p *QueryRequest) writeField8(oprot thrift.TProtocol) (err error) { return err } +func (p *QueryRequest) writeField9(oprot thrift.TProtocol) (err error) { + if p.IsSetSource() { + if err := oprot.WriteFieldBegin("source", thrift.STRING, 9); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 9:source: ", p), err) + } + if err := oprot.WriteBinary(p.Source); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.source (9) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 9:source: ", p), err) + } + } + return err +} + func (p *QueryRequest) String() string { if p == nil { return "" diff --git a/src/dbnode/network/server/tchannelthrift/cluster/service.go b/src/dbnode/network/server/tchannelthrift/cluster/service.go index c0cc0dc05b..40891ef573 100644 --- a/src/dbnode/network/server/tchannelthrift/cluster/service.go +++ b/src/dbnode/network/server/tchannelthrift/cluster/service.go @@ -120,9 +120,13 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR StartInclusive: start, EndExclusive: end, } + if l := req.Limit; l != nil { opts.SeriesLimit = int(*l) } + if len(req.Source) > 0 { + opts.Source = req.Source + } session, err := s.session() if err != nil { diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert.go b/src/dbnode/network/server/tchannelthrift/convert/convert.go index 6d40c56c6f..b2d4ac20f1 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert.go @@ -229,6 +229,9 @@ func FromRPCFetchTaggedRequest( if l := req.DocsLimit; l != nil { opts.DocsLimit = int(*l) } + if len(req.Source) > 0 { + opts.Source = req.Source + } q, err := idx.Unmarshal(req.Query) if err != nil { @@ -286,6 +289,10 @@ func ToRPCFetchTaggedRequest( request.DocsLimit = &l } + if len(opts.Source) > 0 { + request.Source = opts.Source + } + return request, nil } @@ -312,6 +319,9 @@ func FromRPCAggregateQueryRequest( if l := req.Limit; l != nil { opts.SeriesLimit = int(*l) } + if len(req.Source) > 0 { + opts.Source = req.Source + } query, err := FromRPCQuery(req.Query) if err != nil { @@ -357,7 +367,9 @@ func FromRPCAggregateQueryRawRequest( if l := req.Limit; l != nil { opts.SeriesLimit = int(*l) } - + if len(req.Source) > 0 { + opts.Source = req.Source + } query, err := idx.Unmarshal(req.Query) if err != nil { return nil, index.Query{}, index.AggregationOptions{}, err @@ -407,6 +419,10 @@ func ToRPCAggregateQueryRawRequest( request.Limit = &l } + if len(opts.Source) > 0 { + request.Source = opts.Source + } + query, queryErr := idx.Marshal(q.Query) if queryErr != nil { return rpc.AggregateQueryRawRequest{}, queryErr diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 871dd91b20..9efe6b78cd 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -21,6 +21,7 @@ package node import ( + goctx "context" "errors" "fmt" "runtime" @@ -36,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts/writes" "github.com/m3db/m3/src/dbnode/x/xio" @@ -453,7 +455,8 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR } defer s.readRPCCompleted() - ctx, sp, sampled := tchannelthrift.Context(tctx).StartSampledTraceSpan(tracepoint.Query) + ctx := addSourceToContext(tctx, req.Source) + ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.Query) if sampled { sp.LogFields( opentracinglog.String("query", req.Query.String()), @@ -492,6 +495,9 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query if l := req.Limit; l != nil { opts.SeriesLimit = int(*l) } + if len(req.Source) > 0 { + opts.Source = req.Source + } queryResult, err := db.QueryIDs(ctx, nsID, index.Query{Query: q}, opts) if err != nil { return nil, convert.ToRPCError(err) @@ -614,11 +620,12 @@ func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchR var ( callStart = s.nowFn() - ctx = tchannelthrift.Context(tctx) + ctx = addSourceToContext(tctx, req.Source) start, rangeStartErr = convert.ToTime(req.RangeStart, req.RangeType) end, rangeEndErr = convert.ToTime(req.RangeEnd, req.RangeType) ) + if rangeStartErr != nil || rangeEndErr != nil { s.metrics.fetch.ReportError(s.nowFn().Sub(callStart)) return nil, tterrors.NewBadRequestError(xerrors.FirstError(rangeStartErr, rangeEndErr)) @@ -697,7 +704,8 @@ func (s *service) FetchTagged(tctx thrift.Context, req *rpc.FetchTaggedRequest) } defer s.readRPCCompleted() - ctx, sp, sampled := tchannelthrift.Context(tctx).StartSampledTraceSpan(tracepoint.FetchTagged) + ctx := addSourceToContext(tctx, req.Source) + ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchTagged) if sampled { sp.LogFields( opentracinglog.String("query", string(req.Query)), @@ -895,7 +903,7 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe defer s.readRPCCompleted() callStart := s.nowFn() - ctx := tchannelthrift.Context(tctx) + ctx := addSourceToContext(tctx, req.Source) ns, query, opts, err := convert.FromRPCAggregateQueryRawRequest(req, s.pools) if err != nil { @@ -966,7 +974,7 @@ func (s *service) FetchBatchRaw(tctx thrift.Context, req *rpc.FetchBatchRawReque defer s.readRPCCompleted() callStart := s.nowFn() - ctx := tchannelthrift.Context(tctx) + ctx := addSourceToContext(tctx, req.Source) start, rangeStartErr := convert.ToTime(req.RangeStart, req.RangeTimeType) end, rangeEndErr := convert.ToTime(req.RangeEnd, req.RangeTimeType) @@ -1046,13 +1054,14 @@ func (s *service) FetchBatchRawV2(tctx thrift.Context, req *rpc.FetchBatchRawV2R var ( callStart = s.nowFn() - ctx = tchannelthrift.Context(tctx) + ctx = addSourceToContext(tctx, req.Source) nsIDs = make([]ident.ID, 0, len(req.Elements)) result = rpc.NewFetchBatchRawResult_() success int retryableErrors int nonRetryableErrors int ) + for _, nsBytes := range req.NameSpaces { nsIDs = append(nsIDs, s.newID(ctx, nsBytes)) } @@ -2619,3 +2628,16 @@ func finalizeEncodedTagsFn(b []byte) { func finalizeAnnotationFn(b []byte) { apachethrift.BytesPoolPut(b) } + +func addSourceToContext(tctx thrift.Context, source []byte) context.Context { + ctx := tchannelthrift.Context(tctx) + if len(source) > 0 { + if base, ok := ctx.GoContext(); ok { + ctx.SetGoContext(goctx.WithValue(base, limits.SourceContextKey, source)) + } else { + ctx.SetGoContext(goctx.WithValue(goctx.Background(), limits.SourceContextKey, source)) + } + } + + return ctx +} diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 7d9eca227e..028e3bf3dc 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -448,7 +448,7 @@ func (r *blockRetriever) fetchBatch( continue } - if err := r.bytesReadLimit.Inc(int(entry.Size)); err != nil { + if err := r.bytesReadLimit.Inc(int(entry.Size), req.source); err != nil { req.err = err limitErr = err continue @@ -631,6 +631,13 @@ func (r *blockRetriever) Stream( req.onRetrieve = onRetrieve req.streamReqType = streamDataReq + goCtx, found := ctx.GoContext() + if found { + if source, ok := goCtx.Value(limits.SourceContextKey).([]byte); ok { + req.source = source + } + } + found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) if err != nil { req.resultWg.Done() @@ -776,6 +783,7 @@ type retrieveRequest struct { blockSize time.Duration onRetrieve block.OnRetrieveBlock nsCtx namespace.Context + source []byte streamReqType streamReqType indexEntry IndexEntry @@ -947,6 +955,7 @@ func (req *retrieveRequest) resetForReuse() { req.resultWg = sync.WaitGroup{} req.finalized = false req.finalizes = 0 + req.source = nil req.shard = 0 req.id = nil req.tags = ident.EmptyTagIterator diff --git a/src/dbnode/server/options.go b/src/dbnode/server/options.go index ba082f20bc..de17ff5f76 100644 --- a/src/dbnode/server/options.go +++ b/src/dbnode/server/options.go @@ -22,15 +22,9 @@ package server import ( "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/node" - "github.com/m3db/m3/src/dbnode/storage" ) // StorageOptions are options to apply to the database storage options. type StorageOptions struct { - OnColdFlush storage.OnColdFlush - ForceColdWritesEnabled bool - TChanNodeServerFn node.NewTChanNodeServerFn - BackgroundProcessFns []storage.NewBackgroundProcessFn - NamespaceHooks storage.NamespaceHooks - NewTileAggregatorFn storage.NewTileAggregatorFn + TChanNodeServerFn node.NewTChanNodeServerFn } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index f2bbdb8819..b33b73f40e 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -141,7 +141,10 @@ type RunOptions struct { // CustomOptions are custom options to apply to the session. CustomOptions []client.CustomAdminOption - // StorageOptions are options to apply to the database storage options. + // Transforms are transforms to apply to the database storage options. + Transforms []storage.OptionTransform + + // StorageOptions are additional storage options. StorageOptions StorageOptions // CustomBuildTags are additional tags to be added to the instrument build @@ -341,14 +344,14 @@ func Run(runOpts RunOptions) { var ( opts = storage.NewOptions() - iopts = opts.InstrumentOptions(). + iOpts = opts.InstrumentOptions(). SetLogger(logger). SetMetricsScope(scope). SetTimerOptions(timerOpts). SetTracer(tracer). SetCustomBuildTags(runOpts.CustomBuildTags) ) - opts = opts.SetInstrumentOptions(iopts) + opts = opts.SetInstrumentOptions(iOpts) // Only override the default MemoryTracker (which has default limits) if a custom limit has // been set. @@ -368,7 +371,7 @@ func Run(runOpts RunOptions) { logger.Warn("max index query IDs concurrency was not set, falling back to default value") } - buildReporter := instrument.NewBuildReporter(iopts) + buildReporter := instrument.NewBuildReporter(iOpts) if err := buildReporter.Start(); err != nil { logger.Fatal("unable to start build reporter", zap.Error(err)) } @@ -424,9 +427,13 @@ func Run(runOpts RunOptions) { // Setup index regexp compilation cache. m3ninxindex.SetRegexpCacheOptions(m3ninxindex.RegexpCacheOptions{ Size: cfg.Cache.RegexpConfiguration().SizeOrDefault(), - Scope: iopts.MetricsScope(), + Scope: iOpts.MetricsScope(), }) + for _, transform := range runOpts.Transforms { + opts = transform(opts) + } + // Setup query stats tracking. docsLimit := limits.DefaultLookbackLimitOptions() bytesReadLimit := limits.DefaultLookbackLimitOptions() @@ -438,7 +445,14 @@ func Run(runOpts RunOptions) { bytesReadLimit.Limit = limitConfig.Value bytesReadLimit.Lookback = limitConfig.Lookback } - queryLimits, err := limits.NewQueryLimits(docsLimit, bytesReadLimit, iopts) + limitOpts := limits.NewOptions(). + SetDocsLimitOpts(docsLimit). + SetBytesReadLimitOpts(bytesReadLimit). + SetInstrumentOptions(iOpts) + if builder := opts.SourceLoggerBuilder(); builder != nil { + limitOpts = limitOpts.SetSourceLoggerBuilder(builder) + } + queryLimits, err := limits.NewQueryLimits(limitOpts) if err != nil { logger.Fatal("could not construct docs query limits from config", zap.Error(err)) } @@ -624,15 +638,16 @@ func Run(runOpts RunOptions) { }() opts = opts.SetIndexClaimsManager(icm) + forceColdWrites := opts.ForceColdWritesEnabled() var envCfgResults environment.ConfigureResults if len(envConfig.Statics) == 0 { logger.Info("creating dynamic config service client with m3cluster") envCfgResults, err = envConfig.Configure(environment.ConfigurationParameters{ - InstrumentOpts: iopts, + InstrumentOpts: iOpts, HashingSeed: cfg.Hashing.Seed, NewDirectoryMode: newDirectoryMode, - ForceColdWritesEnabled: runOpts.StorageOptions.ForceColdWritesEnabled, + ForceColdWritesEnabled: forceColdWrites, }) if err != nil { logger.Fatal("could not initialize dynamic config", zap.Error(err)) @@ -641,9 +656,9 @@ func Run(runOpts RunOptions) { logger.Info("creating static config service client with m3cluster") envCfgResults, err = envConfig.Configure(environment.ConfigurationParameters{ - InstrumentOpts: iopts, + InstrumentOpts: iOpts, HostID: hostID, - ForceColdWritesEnabled: runOpts.StorageOptions.ForceColdWritesEnabled, + ForceColdWritesEnabled: forceColdWrites, }) if err != nil { logger.Fatal("could not initialize static config", zap.Error(err)) @@ -715,7 +730,7 @@ func Run(runOpts RunOptions) { if debugListenAddress != "" { var debugWriter xdebug.ZipWriter handlerOpts, err := placement.NewHandlerOptions(syncCfg.ClusterClient, - queryconfig.Configuration{}, nil, iopts) + queryconfig.Configuration{}, nil, iOpts) if err != nil { logger.Warn("could not create handler options for debug writer", zap.Error(err)) } else { @@ -738,7 +753,7 @@ func Run(runOpts RunOptions) { }, }, }, - iopts) + iOpts) if err != nil { logger.Error("unable to create debug writer", zap.Error(err)) } @@ -789,7 +804,7 @@ func Run(runOpts RunOptions) { origin := topology.NewHost(hostID, "") m3dbClient, err := newAdminClient( - cfg.Client, iopts, tchannelOpts, syncCfg.TopologyInitializer, + cfg.Client, iOpts, tchannelOpts, syncCfg.TopologyInitializer, runtimeOptsMgr, origin, protoEnabled, schemaRegistry, syncCfg.KVStore, logger, runOpts.CustomOptions) if err != nil { @@ -825,7 +840,7 @@ func Run(runOpts RunOptions) { // Guaranteed to not be nil if repair is enabled by config validation. clientCfg := *cluster.Client clusterClient, err := newAdminClient( - clientCfg, iopts, tchannelOpts, topologyInitializer, + clientCfg, iOpts, tchannelOpts, topologyInitializer, runtimeOptsMgr, origin, protoEnabled, schemaRegistry, syncCfg.KVStore, logger, runOpts.CustomOptions) if err != nil { @@ -865,21 +880,6 @@ func Run(runOpts RunOptions) { opts = opts.SetRepairEnabled(false) } - if runOpts.StorageOptions.OnColdFlush != nil { - opts = opts.SetOnColdFlush(runOpts.StorageOptions.OnColdFlush) - } - - opts = opts.SetBackgroundProcessFns(append(opts.BackgroundProcessFns(), runOpts.StorageOptions.BackgroundProcessFns...)) - - if runOpts.StorageOptions.NamespaceHooks != nil { - opts = opts.SetNamespaceHooks(runOpts.StorageOptions.NamespaceHooks) - } - - if runOpts.StorageOptions.NewTileAggregatorFn != nil { - aggregator := runOpts.StorageOptions.NewTileAggregatorFn(iopts) - opts = opts.SetTileAggregator(aggregator) - } - // Set bootstrap options - We need to create a topology map provider from the // same topology that will be passed to the cluster so that when we make // bootstrapping decisions they are in sync with the clustered database @@ -1318,7 +1318,7 @@ func withEncodingAndPoolingOptions( opts storage.Options, policy config.PoolingPolicy, ) storage.Options { - iopts := opts.InstrumentOptions() + iOpts := opts.InstrumentOptions() scope := opts.InstrumentOptions().MetricsScope() // Set the max bytes pool byte slice alloc size for the thrift pooling. @@ -1328,9 +1328,9 @@ func withEncodingAndPoolingOptions( apachethrift.SetMaxBytesPoolAlloc(thriftBytesAllocSize) bytesPoolOpts := pool.NewObjectPoolOptions(). - SetInstrumentOptions(iopts.SetMetricsScope(scope.SubScope("bytes-pool"))) + SetInstrumentOptions(iOpts.SetMetricsScope(scope.SubScope("bytes-pool"))) checkedBytesPoolOpts := bytesPoolOpts. - SetInstrumentOptions(iopts.SetMetricsScope(scope.SubScope("checked-bytes-pool"))) + SetInstrumentOptions(iOpts.SetMetricsScope(scope.SubScope("checked-bytes-pool"))) buckets := make([]pool.Bucket, len(policy.BytesPool.Buckets)) for i, bucket := range policy.BytesPool.Buckets { @@ -1555,7 +1555,7 @@ func withEncodingAndPoolingOptions( runtimeOpts = opts.RuntimeOptionsManager() wiredListOpts = block.WiredListOptions{ RuntimeOptionsManager: runtimeOpts, - InstrumentOptions: iopts, + InstrumentOptions: iOpts, ClockOptions: opts.ClockOptions(), } lruCfg = cfg.Cache.SeriesConfiguration().LRU @@ -1615,15 +1615,15 @@ func withEncodingAndPoolingOptions( // Set index options. indexOpts := opts.IndexOptions(). - SetInstrumentOptions(iopts). + SetInstrumentOptions(iOpts). SetMemSegmentOptions( opts.IndexOptions().MemSegmentOptions(). SetPostingsListPool(postingsList). - SetInstrumentOptions(iopts)). + SetInstrumentOptions(iOpts)). SetFSTSegmentOptions( opts.IndexOptions().FSTSegmentOptions(). SetPostingsListPool(postingsList). - SetInstrumentOptions(iopts). + SetInstrumentOptions(iOpts). SetContextPool(opts.ContextPool())). SetSegmentBuilderOptions( opts.IndexOptions().SegmentBuilderOptions(). @@ -1657,7 +1657,7 @@ func withEncodingAndPoolingOptions( func newAdminClient( config client.Configuration, - iopts instrument.Options, + iOpts instrument.Options, tchannelOpts *tchannel.ChannelOptions, topologyInitializer topology.Initializer, runtimeOptsMgr m3dbruntime.OptionsManager, @@ -1702,8 +1702,8 @@ func newAdminClient( options = append(options, custom...) m3dbClient, err := config.NewAdminClient( client.ConfigurationParameters{ - InstrumentOptions: iopts. - SetMetricsScope(iopts.MetricsScope().SubScope("m3dbclient")), + InstrumentOptions: iOpts. + SetMetricsScope(iOpts.MetricsScope().SubScope("m3dbclient")), TopologyInitializer: topologyInitializer, }, options..., diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 120fe09a3d..cbb641e6e5 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -471,6 +471,7 @@ func (b *block) queryWithSpan( cancellable.ReleaseCheckout() var ( + source = opts.Source iterCloser = safeCloser{closable: iter} size = results.Size() docsCount = results.TotalDocsCount() @@ -498,7 +499,7 @@ func (b *block) queryWithSpan( continue } - batch, size, docsCount, err = b.addQueryResults(cancellable, results, batch) + batch, size, docsCount, err = b.addQueryResults(cancellable, results, batch, source) if err != nil { return false, err } @@ -506,7 +507,7 @@ func (b *block) queryWithSpan( // Add last batch to results if remaining. if len(batch) > 0 { - batch, size, docsCount, err = b.addQueryResults(cancellable, results, batch) + batch, size, docsCount, err = b.addQueryResults(cancellable, results, batch, source) if err != nil { return false, err } @@ -533,10 +534,11 @@ func (b *block) addQueryResults( cancellable *xresource.CancellableLifetime, results BaseResults, batch []doc.Document, + source []byte, ) ([]doc.Document, int, int, error) { // update recently queried docs to monitor memory. if results.EnforceLimits() { - if err := b.docsLimit.Inc(len(batch)); err != nil { + if err := b.docsLimit.Inc(len(batch), source); err != nil { return batch, 0, 0, err } } @@ -639,6 +641,7 @@ func (b *block) aggregateWithSpan( } var ( + source = opts.Source size = results.Size() docsCount = results.TotalDocsCount() batch = b.opts.AggregateResultsEntryArrayPool().Get() @@ -694,7 +697,7 @@ func (b *block) aggregateWithSpan( continue } - batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch) + batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) if err != nil { return false, err } @@ -712,7 +715,7 @@ func (b *block) aggregateWithSpan( // Add last batch to results if remaining. if len(batch) > 0 { - batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch) + batch, size, docsCount, err = b.addAggregateResults(cancellable, results, batch, source) if err != nil { return false, err } @@ -795,10 +798,11 @@ func (b *block) addAggregateResults( cancellable *xresource.CancellableLifetime, results AggregateResults, batch []AggregateResultsEntry, + source []byte, ) ([]AggregateResultsEntry, int, int, error) { // update recently queried docs to monitor memory. if results.EnforceLimits() { - if err := b.docsLimit.Inc(len(batch)); err != nil { + if err := b.docsLimit.Inc(len(batch), source); err != nil { return batch, 0, 0, err } } diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index ebceb8f8e4..8dd26d6025 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -92,6 +92,8 @@ type QueryOptions struct { RequireExhaustive bool // IterationOptions controls additional iteration methods. IterationOptions IterationOptions + // Source is an optional query source. + Source []byte } // WideQueryOptions enables users to specify constraints and diff --git a/src/dbnode/storage/limits/noop_query_limits.go b/src/dbnode/storage/limits/noop_query_limits.go index 4d940de4f9..672ea8d52d 100644 --- a/src/dbnode/storage/limits/noop_query_limits.go +++ b/src/dbnode/storage/limits/noop_query_limits.go @@ -54,6 +54,6 @@ func (q *noOpQueryLimits) Stop() { func (q *noOpQueryLimits) Start() { } -func (q *noOpLookbackLimit) Inc(int) error { +func (q *noOpLookbackLimit) Inc(int, []byte) error { return nil } diff --git a/src/dbnode/storage/limits/options.go b/src/dbnode/storage/limits/options.go new file mode 100644 index 0000000000..5f8c49a89b --- /dev/null +++ b/src/dbnode/storage/limits/options.go @@ -0,0 +1,107 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +import ( + "errors" + "fmt" + + "github.com/m3db/m3/src/x/instrument" +) + +type limitOpts struct { + iOpts instrument.Options + docsLimitOpts LookbackLimitOptions + bytesReadLimitOpts LookbackLimitOptions + sourceLoggerBuilder SourceLoggerBuilder +} + +// NewOptions creates limit options with default values. +func NewOptions() Options { + return &limitOpts{ + sourceLoggerBuilder: &sourceLoggerBuilder{}, + } +} + +// Validate validates the options. +func (o *limitOpts) Validate() error { + if o.iOpts == nil { + return errors.New("limit options invalid: no instrument options") + } + + if err := o.docsLimitOpts.validate(); err != nil { + return fmt.Errorf("doc limit options invalid: %w", err) + } + + if err := o.bytesReadLimitOpts.validate(); err != nil { + return fmt.Errorf("bytes limit options invalid: %w", err) + } + + return nil +} + +// SetInstrumentOptions sets the instrument options. +func (o *limitOpts) SetInstrumentOptions(value instrument.Options) Options { + opts := *o + opts.iOpts = value + return &opts +} + +// InstrumentOptions returns the instrument options. +func (o *limitOpts) InstrumentOptions() instrument.Options { + return o.iOpts +} + +// SetDocsLimitOpts sets the doc limit options. +func (o *limitOpts) SetDocsLimitOpts(value LookbackLimitOptions) Options { + opts := *o + opts.docsLimitOpts = value + return &opts +} + +// DocsLimitOpts returns the doc limit options. +func (o *limitOpts) DocsLimitOpts() LookbackLimitOptions { + return o.docsLimitOpts +} + +// SetBytesReadLimitOpts sets the byte read limit options. +func (o *limitOpts) SetBytesReadLimitOpts(value LookbackLimitOptions) Options { + opts := *o + opts.bytesReadLimitOpts = value + return &opts +} + +// BytesReadLimitOpts returns the byte read limit options. +func (o *limitOpts) BytesReadLimitOpts() LookbackLimitOptions { + return o.bytesReadLimitOpts +} + +// SetSourceLoggerBuilder sets the source logger. +func (o *limitOpts) SetSourceLoggerBuilder(value SourceLoggerBuilder) Options { + opts := *o + opts.sourceLoggerBuilder = value + return &opts +} + +// SourceLoggerBuilder sets the source logger. +func (o *limitOpts) SourceLoggerBuilder() SourceLoggerBuilder { + return o.sourceLoggerBuilder +} diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go index d9840571a4..49274f83b7 100644 --- a/src/dbnode/storage/limits/query_limits.go +++ b/src/dbnode/storage/limits/query_limits.go @@ -31,7 +31,9 @@ import ( "go.uber.org/atomic" ) -const defaultLookback = time.Second * 15 +const ( + defaultLookback = time.Second * 15 +) type queryLimits struct { docsLimit *lookbackLimit @@ -51,6 +53,8 @@ type lookbackLimitMetrics struct { recentMax tally.Gauge total tally.Counter exceeded tally.Counter + + sourceLogger SourceLogger } var ( @@ -69,18 +73,27 @@ func DefaultLookbackLimitOptions() LookbackLimitOptions { // NewQueryLimits returns a new query limits manager. func NewQueryLimits( - docsLimitOpts LookbackLimitOptions, - bytesReadLimitOpts LookbackLimitOptions, - instrumentOpts instrument.Options, + options Options, + // docsLimitOpts LookbackLimitOptions, + // bytesReadLimitOpts LookbackLimitOptions, + // instrumentOpts instrument.Options, ) (QueryLimits, error) { - if err := docsLimitOpts.validate(); err != nil { + if err := options.Validate(); err != nil { return nil, err } - if err := bytesReadLimitOpts.validate(); err != nil { - return nil, err - } - docsLimit := newLookbackLimit(instrumentOpts, docsLimitOpts, "docs-matched") - bytesReadLimit := newLookbackLimit(instrumentOpts, bytesReadLimitOpts, "disk-bytes-read") + + var ( + iOpts = options.InstrumentOptions() + docsLimitOpts = options.DocsLimitOpts() + bytesReadLimitOpts = options.BytesReadLimitOpts() + sourceLoggerBuilder = options.SourceLoggerBuilder() + + docsLimit = newLookbackLimit( + iOpts, docsLimitOpts, "docs-matched", sourceLoggerBuilder) + bytesReadLimit = newLookbackLimit( + iOpts, bytesReadLimitOpts, "disk-bytes-read", sourceLoggerBuilder) + ) + return &queryLimits{ docsLimit: docsLimit, bytesReadLimit: bytesReadLimit, @@ -91,25 +104,36 @@ func newLookbackLimit( instrumentOpts instrument.Options, opts LookbackLimitOptions, name string, + sourceLoggerBuilder SourceLoggerBuilder, ) *lookbackLimit { return &lookbackLimit{ name: name, options: opts, - metrics: newLookbackLimitMetrics(instrumentOpts, name), + metrics: newLookbackLimitMetrics(instrumentOpts, name, sourceLoggerBuilder), recent: atomic.NewInt64(0), stopCh: make(chan struct{}), } } -func newLookbackLimitMetrics(instrumentOpts instrument.Options, name string) lookbackLimitMetrics { +func newLookbackLimitMetrics( + instrumentOpts instrument.Options, + name string, + sourceLoggerBuilder SourceLoggerBuilder, +) lookbackLimitMetrics { scope := instrumentOpts. MetricsScope(). SubScope("query-limit") + + sourceLogger := sourceLoggerBuilder.NewSourceLogger(name, + instrumentOpts.SetMetricsScope(scope)) + return lookbackLimitMetrics{ recentCount: scope.Gauge(fmt.Sprintf("recent-count-%s", name)), recentMax: scope.Gauge(fmt.Sprintf("recent-max-%s", name)), total: scope.Counter(fmt.Sprintf("total-%s", name)), exceeded: scope.Tagged(map[string]string{"limit": name}).Counter("exceeded"), + + sourceLogger: sourceLogger, } } @@ -139,7 +163,7 @@ func (q *queryLimits) AnyExceeded() error { } // Inc increments the current value and returns an error if above the limit. -func (q *lookbackLimit) Inc(val int) error { +func (q *lookbackLimit) Inc(val int, source []byte) error { if val < 0 { return fmt.Errorf("invalid negative query limit inc %d", val) } @@ -155,6 +179,8 @@ func (q *lookbackLimit) Inc(val int) error { q.metrics.recentCount.Update(float64(recent)) q.metrics.total.Inc(valI64) + q.metrics.sourceLogger.LogSourceValue(valI64, source) + // Enforce limit (if specified). return q.checkLimit(recent) } diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go index 834427ee92..5ea202bbc1 100644 --- a/src/dbnode/storage/limits/query_limits_test.go +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -34,6 +34,17 @@ import ( "github.com/uber-go/tally" ) +func testQueryLimitOptions( + docOpts LookbackLimitOptions, + bytesOpts LookbackLimitOptions, + iOpts instrument.Options, +) Options { + return NewOptions(). + SetDocsLimitOpts(docOpts). + SetBytesReadLimitOpts(bytesOpts). + SetInstrumentOptions(iOpts) +} + func TestQueryLimits(t *testing.T) { docOpts := LookbackLimitOptions{ Limit: 1, @@ -43,7 +54,8 @@ func TestQueryLimits(t *testing.T) { Limit: 1, Lookback: time.Second, } - queryLimits, err := NewQueryLimits(docOpts, bytesOpts, instrument.NewOptions()) + opts := testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) + queryLimits, err := NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) @@ -51,12 +63,13 @@ func TestQueryLimits(t *testing.T) { require.NoError(t, queryLimits.AnyExceeded()) // Limit from docs. - queryLimits.DocsLimit().Inc(2) + require.Error(t, queryLimits.DocsLimit().Inc(2, nil)) err = queryLimits.AnyExceeded() require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) - queryLimits, err = NewQueryLimits(docOpts, bytesOpts, instrument.NewOptions()) + opts = testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) + queryLimits, err = NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) @@ -65,7 +78,7 @@ func TestQueryLimits(t *testing.T) { require.NoError(t, err) // Limit from bytes. - queryLimits.BytesReadLimit().Inc(2) + require.Error(t, queryLimits.BytesReadLimit().Inc(2, nil)) err = queryLimits.AnyExceeded() require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) @@ -87,7 +100,7 @@ func TestLookbackLimit(t *testing.T) { Lookback: time.Millisecond * 100, } name := "test" - limit := newLookbackLimit(iOpts, opts, name) + limit := newLookbackLimit(iOpts, opts, name, &sourceLoggerBuilder{}) require.Equal(t, int64(0), limit.current()) err := limit.exceeded() @@ -142,7 +155,7 @@ func TestLookbackLimit(t *testing.T) { func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int64) int64 { var exceededCount int64 - err := limit.Inc(inc) + err := limit.Inc(inc, nil) if limit.current() <= expectedLimit || expectedLimit == 0 { require.NoError(t, err) } else { @@ -169,9 +182,9 @@ func TestLookbackReset(t *testing.T) { Lookback: time.Millisecond * 100, } name := "test" - limit := newLookbackLimit(iOpts, opts, name) + limit := newLookbackLimit(iOpts, opts, name, &sourceLoggerBuilder{}) - err := limit.Inc(3) + err := limit.Inc(3, nil) require.NoError(t, err) require.Equal(t, int64(3), limit.current()) @@ -264,3 +277,65 @@ func verifyMetrics(t *testing.T, assert.True(t, exists) assert.Equal(t, expectedExceeded, exceeded.Value(), "exceeded wrong") } + +type testLoggerRecord struct { + name string + val int64 + source []byte +} + +func TestSourceLogger(t *testing.T) { + var ( + scope = tally.NewTestScope("test", nil) + iOpts = instrument.NewOptions().SetMetricsScope(scope) + noLimit = LookbackLimitOptions{ + Limit: 0, + Lookback: time.Millisecond * 100, + } + + builder = &testBuilder{records: []testLoggerRecord{}} + opts = testQueryLimitOptions(noLimit, noLimit, iOpts). + SetSourceLoggerBuilder(builder) + ) + + require.NoError(t, opts.Validate()) + + queryLimits, err := NewQueryLimits(opts) + require.NoError(t, err) + require.NotNil(t, queryLimits) + + require.NoError(t, queryLimits.DocsLimit().Inc(100, []byte("docs"))) + require.NoError(t, queryLimits.BytesReadLimit().Inc(200, []byte("bytes"))) + + assert.Equal(t, []testLoggerRecord{ + {name: "docs-matched", val: 100, source: []byte("docs")}, + {name: "disk-bytes-read", val: 200, source: []byte("bytes")}, + }, builder.records) +} + +// NB: creates test logger records that share an underlying record set, +// differentiated by source logger name. +type testBuilder struct { + records []testLoggerRecord +} + +var _ SourceLoggerBuilder = (*testBuilder)(nil) + +func (s *testBuilder) NewSourceLogger(n string, _ instrument.Options) SourceLogger { + return &testSourceLogger{name: n, builder: s} +} + +type testSourceLogger struct { + name string + builder *testBuilder +} + +var _ SourceLogger = (*testSourceLogger)(nil) + +func (l *testSourceLogger) LogSourceValue(val int64, source []byte) { + l.builder.records = append(l.builder.records, testLoggerRecord{ + name: l.name, + val: val, + source: source, + }) +} diff --git a/src/dbnode/storage/limits/source_logger.go b/src/dbnode/storage/limits/source_logger.go new file mode 100644 index 0000000000..4d0abc47f8 --- /dev/null +++ b/src/dbnode/storage/limits/source_logger.go @@ -0,0 +1,72 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +import ( + "fmt" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/m3db/m3/src/x/instrument" +) + +// NB: log queries that are over 1 GB. +const defaultLimit = 1024 * 1024 * 1024 + +type sourceLoggerBuilder struct{} + +var _ SourceLoggerBuilder = (*sourceLoggerBuilder)(nil) + +func (s *sourceLoggerBuilder) NewSourceLogger( + name string, + iOpts instrument.Options, +) SourceLogger { + var ( + logger = iOpts.Logger() + debugLog = logger.Check(zapcore.DebugLevel, fmt.Sprint("source logger for", name)) + ) + + if debugLog == nil { + // NB: use empty logger. + return &sourceLogger{} + } + + return &sourceLogger{logger: logger.With(zap.String("limit_name", name))} +} + +type sourceLogger struct { + logger *zap.Logger +} + +var _ SourceLogger = (*sourceLogger)(nil) + +func (l *sourceLogger) LogSourceValue(val int64, source []byte) { + if l.logger == nil || val < defaultLimit || len(source) == 0 { + // NB: Don't log each source as it would be very noisy, even at debug level. + return + } + + l.logger.Debug("query from source exceeded size", + zap.ByteString("source", source), + zap.Int64("size", val), + zap.Int64("limit", defaultLimit)) +} diff --git a/src/dbnode/storage/limits/source_logger_test.go b/src/dbnode/storage/limits/source_logger_test.go new file mode 100644 index 0000000000..39970e37fa --- /dev/null +++ b/src/dbnode/storage/limits/source_logger_test.go @@ -0,0 +1,92 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/m3db/m3/src/x/instrument" +) + +func newTestZapHookOptions(logs *[]string) zap.Option { + // NB: allows inspection of incoming logs by writing them to slice. + return zap.Hooks(func(entry zapcore.Entry) error { + *logs = append(*logs, entry.Message) + return nil + }) +} + +func TestNewSourceLoggerAboveDebugLevelDoesNotLog(t *testing.T) { + testSourceLoggerWithLevel(t, false) +} + +func TestNewSourceLoggerAtDebugLevelLogsWhenExceeded(t *testing.T) { + testSourceLoggerWithLevel(t, true) +} + +func testSourceLoggerWithLevel(t *testing.T, debugLog bool) { + var ( + err error + logger *zap.Logger + + writtenLogs = []string{} + zapHookOpts = newTestZapHookOptions(&writtenLogs) + ) + + if debugLog { + logger, err = zap.NewDevelopment(zapHookOpts) + } else { + logger, err = zap.NewProduction(zapHookOpts) + } + + require.NoError(t, err) + var ( + iOpts = instrument.NewOptions().SetLogger(logger) + builder = &sourceLoggerBuilder{} + sourceLogger = builder.NewSourceLogger("name", iOpts) + ) + + sourceLogger.LogSourceValue(1, []byte("foo")) + assert.Equal(t, 0, len(writtenLogs)) + + sourceLogger.LogSourceValue(defaultLimit-1, []byte("defaultLimit-1")) + assert.Equal(t, 0, len(writtenLogs)) + + sourceLogger.LogSourceValue(defaultLimit, []byte("defaultLimit")) + if debugLog { + assert.Equal(t, 1, len(writtenLogs)) + } else { + assert.Equal(t, 0, len(writtenLogs)) + } + + sourceLogger.LogSourceValue(math.MaxInt64, []byte("maxInt")) + if debugLog { + assert.Equal(t, 2, len(writtenLogs)) + } else { + assert.Equal(t, 0, len(writtenLogs)) + } +} diff --git a/src/dbnode/storage/limits/types.go b/src/dbnode/storage/limits/types.go index c187f35a60..d38db00452 100644 --- a/src/dbnode/storage/limits/types.go +++ b/src/dbnode/storage/limits/types.go @@ -18,12 +18,21 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// Package limits contains paths to enforce read query limits. package limits import ( "time" + + "github.com/m3db/m3/src/x/instrument" ) +// Key is a specific string type for context setting. +type Key string + +// SourceContextKey is the key for setting and retrieving source from context. +const SourceContextKey Key = "source" + // QueryLimits provides an interface for managing query limits. type QueryLimits interface { // DocsLimit limits queries by a global concurrent count of index docs matched. @@ -42,7 +51,7 @@ type QueryLimits interface { // LookbackLimit provides an interface for a specific query limit. type LookbackLimit interface { // Inc increments the recent value for the limit. - Inc(new int) error + Inc(new int, source []byte) error } // LookbackLimitOptions holds options for a lookback limit to be enforced. @@ -52,3 +61,45 @@ type LookbackLimitOptions struct { // Lookback is the period over which the limit is enforced. Lookback time.Duration } + +// SourceLoggerBuilder builds a SourceLogger given instrument options. +type SourceLoggerBuilder interface { + // NewSourceLogger builds a source logger. + NewSourceLogger(name string, opts instrument.Options) SourceLogger +} + +// SourceLogger attributes limit values to a source. +type SourceLogger interface { + // LogSourceValue attributes values that exceed a given size to the source. + LogSourceValue(val int64, source []byte) +} + +// Options is a set of limit options. +type Options interface { + // Validate validates the options. + Validate() error + + // SetInstrumentOptions sets the instrument options. + SetInstrumentOptions(value instrument.Options) Options + + // InstrumentOptions returns the instrument options. + InstrumentOptions() instrument.Options + + // SetDocsLimitOpts sets the doc limit options. + SetDocsLimitOpts(value LookbackLimitOptions) Options + + // DocsLimitOpts returns the doc limit options. + DocsLimitOpts() LookbackLimitOptions + + // SetBytesReadLimitOpts sets the byte read limit options. + SetBytesReadLimitOpts(value LookbackLimitOptions) Options + + // BytesReadLimitOpts returns the byte read limit options. + BytesReadLimitOpts() LookbackLimitOptions + + // SetSourceLoggerBuilder sets the source logger. + SetSourceLoggerBuilder(value SourceLoggerBuilder) Options + + // SourceLogger sets the source logger. + SourceLoggerBuilder() SourceLoggerBuilder +} diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 2eda82f7b4..e377ec7b3f 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -167,6 +168,8 @@ type options struct { schemaReg namespace.SchemaRegistry blockLeaseManager block.LeaseManager onColdFlush OnColdFlush + forceColdWritesEnabled bool + sourceLoggerBuilder limits.SourceLoggerBuilder iterationOptions index.IterationOptions memoryTracker MemoryTracker mmapReporter mmap.Reporter @@ -791,6 +794,26 @@ func (o *options) OnColdFlush() OnColdFlush { return o.onColdFlush } +func (o *options) SetForceColdWritesEnabled(value bool) Options { + opts := *o + opts.forceColdWritesEnabled = value + return &opts +} + +func (o *options) ForceColdWritesEnabled() bool { + return o.forceColdWritesEnabled +} + +func (o *options) SetSourceLoggerBuilder(value limits.SourceLoggerBuilder) Options { + opts := *o + opts.sourceLoggerBuilder = value + return &opts +} + +func (o *options) SourceLoggerBuilder() limits.SourceLoggerBuilder { + return o.sourceLoggerBuilder +} + func (o *options) SetIterationOptions(value index.IterationOptions) Options { opts := *o opts.iterationOptions = value diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 63bb3c1ef2..ab66619c1d 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -41,6 +41,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -4707,6 +4708,62 @@ func (mr *MockOptionsMockRecorder) OnColdFlush() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnColdFlush", reflect.TypeOf((*MockOptions)(nil).OnColdFlush)) } +// SetForceColdWritesEnabled mocks base method +func (m *MockOptions) SetForceColdWritesEnabled(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetForceColdWritesEnabled", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetForceColdWritesEnabled indicates an expected call of SetForceColdWritesEnabled +func (mr *MockOptionsMockRecorder) SetForceColdWritesEnabled(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForceColdWritesEnabled", reflect.TypeOf((*MockOptions)(nil).SetForceColdWritesEnabled), value) +} + +// ForceColdWritesEnabled mocks base method +func (m *MockOptions) ForceColdWritesEnabled() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForceColdWritesEnabled") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ForceColdWritesEnabled indicates an expected call of ForceColdWritesEnabled +func (mr *MockOptionsMockRecorder) ForceColdWritesEnabled() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForceColdWritesEnabled", reflect.TypeOf((*MockOptions)(nil).ForceColdWritesEnabled)) +} + +// SetSourceLoggerBuilder mocks base method +func (m *MockOptions) SetSourceLoggerBuilder(value limits.SourceLoggerBuilder) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetSourceLoggerBuilder", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetSourceLoggerBuilder indicates an expected call of SetSourceLoggerBuilder +func (mr *MockOptionsMockRecorder) SetSourceLoggerBuilder(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSourceLoggerBuilder", reflect.TypeOf((*MockOptions)(nil).SetSourceLoggerBuilder), value) +} + +// SourceLoggerBuilder mocks base method +func (m *MockOptions) SourceLoggerBuilder() limits.SourceLoggerBuilder { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SourceLoggerBuilder") + ret0, _ := ret[0].(limits.SourceLoggerBuilder) + return ret0 +} + +// SourceLoggerBuilder indicates an expected call of SourceLoggerBuilder +func (mr *MockOptionsMockRecorder) SourceLoggerBuilder() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SourceLoggerBuilder", reflect.TypeOf((*MockOptions)(nil).SourceLoggerBuilder)) +} + // SetMemoryTracker mocks base method func (m *MockOptions) SetMemoryTracker(memTracker MemoryTracker) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index f30ee7be5c..579cae8c7d 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/storage/series/lookup" @@ -994,6 +995,9 @@ type OnColdFlushNamespace interface { Done() error } +// OptionTransform transforms given Options. +type OptionTransform func(Options) Options + // Options represents the options for storage. type Options interface { // Validate validates assumptions baked into the code. @@ -1243,6 +1247,18 @@ type Options interface { // OnColdFlush returns the on cold flush processor. OnColdFlush() OnColdFlush + // SetForceColdWritesEnabled sets options for forcing cold writes. + SetForceColdWritesEnabled(value bool) Options + + // SetForceColdWritesEnabled returns options for forcing cold writes. + ForceColdWritesEnabled() bool + + // SetSourceLoggerBuilder sets the limit source logger builder. + SetSourceLoggerBuilder(value limits.SourceLoggerBuilder) Options + + // SetSourceLoggerBuilder returns the limit source logger builder. + SourceLoggerBuilder() limits.SourceLoggerBuilder + // SetMemoryTracker sets the MemoryTracker. SetMemoryTracker(memTracker MemoryTracker) Options diff --git a/src/query/api/v1/handler/graphite/render.go b/src/query/api/v1/handler/graphite/render.go index f5117aff51..489531f7ad 100644 --- a/src/query/api/v1/handler/graphite/render.go +++ b/src/query/api/v1/handler/graphite/render.go @@ -143,6 +143,10 @@ func (h *renderHandler) serveHTTP( wg.Done() }() + if source := r.Header.Get(headers.SourceHeader); len(source) > 0 { + childCtx.Source = []byte(source) + } + exp, err := h.engine.Compile(target) if err != nil { sendError(errorCh, errors.NewRenamedError(err, diff --git a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go index 1d201d9e25..ac34a43f05 100644 --- a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go +++ b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go @@ -147,6 +147,10 @@ func (b fetchOptionsBuilder) newFetchOptions( ) (*storage.FetchOptions, error) { fetchOpts := storage.NewFetchOptions() + if source := req.Header.Get(headers.SourceHeader); len(source) > 0 { + fetchOpts.Source = []byte(source) + } + seriesLimit, err := ParseLimit(req, headers.LimitMaxSeriesHeader, "limit", b.opts.Limits.SeriesLimit) if err != nil { diff --git a/src/query/generated/proto/rpcpb/query.pb.go b/src/query/generated/proto/rpcpb/query.pb.go index d7069a46c9..6b9805963a 100644 --- a/src/query/generated/proto/rpcpb/query.pb.go +++ b/src/query/generated/proto/rpcpb/query.pb.go @@ -402,7 +402,8 @@ type FetchOptions struct { Aggregated FanoutOption `protobuf:"varint,5,opt,name=aggregated,proto3,enum=rpc.FanoutOption" json:"aggregated,omitempty"` AggregatedOptimized FanoutOption `protobuf:"varint,6,opt,name=aggregatedOptimized,proto3,enum=rpc.FanoutOption" json:"aggregatedOptimized,omitempty"` // Deprecated: all requests will include resolution. - IncludeResolution bool `protobuf:"varint,7,opt,name=includeResolution,proto3" json:"includeResolution,omitempty"` + IncludeResolution bool `protobuf:"varint,7,opt,name=includeResolution,proto3" json:"includeResolution,omitempty"` + Source []byte `protobuf:"bytes,8,opt,name=source,proto3" json:"source,omitempty"` } func (m *FetchOptions) Reset() { *m = FetchOptions{} } @@ -459,6 +460,13 @@ func (m *FetchOptions) GetIncludeResolution() bool { return false } +func (m *FetchOptions) GetSource() []byte { + if m != nil { + return m.Source + } + return nil +} + type RestrictQueryOptions struct { RestrictQueryType *RestrictQueryType `protobuf:"bytes,3,opt,name=restrictQueryType" json:"restrictQueryType,omitempty"` RestrictQueryTags *RestrictQueryTags `protobuf:"bytes,4,opt,name=restrictQueryTags" json:"restrictQueryTags,omitempty"` @@ -2144,6 +2152,12 @@ func (m *FetchOptions) MarshalTo(dAtA []byte) (int, error) { } i++ } + if len(m.Source) > 0 { + dAtA[i] = 0x42 + i++ + i = encodeVarintQuery(dAtA, i, uint64(len(m.Source))) + i += copy(dAtA[i:], m.Source) + } return i, nil } @@ -3334,6 +3348,10 @@ func (m *FetchOptions) Size() (n int) { if m.IncludeResolution { n += 2 } + l = len(m.Source) + if l > 0 { + n += 1 + l + sovQuery(uint64(l)) + } return n } @@ -4522,6 +4540,37 @@ func (m *FetchOptions) Unmarshal(dAtA []byte) error { } } m.IncludeResolution = bool(v != 0) + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Source", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Source = append(m.Source[:0], dAtA[iNdEx:postIndex]...) + if m.Source == nil { + m.Source = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipQuery(dAtA[iNdEx:]) @@ -7805,110 +7854,110 @@ func init() { } var fileDescriptorQuery = []byte{ - // 1667 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x58, 0xdb, 0x72, 0xe3, 0x48, - 0x19, 0xb6, 0xac, 0xf8, 0xf4, 0xfb, 0x10, 0xa7, 0x13, 0x76, 0x9c, 0x30, 0x04, 0x97, 0x80, 0x25, - 0x64, 0x87, 0x38, 0xe3, 0xcc, 0xb2, 0x2c, 0x55, 0x1c, 0x9c, 0xd8, 0x93, 0xa4, 0x36, 0xb1, 0xb3, - 0x6d, 0x85, 0x19, 0x28, 0xa8, 0xd0, 0x96, 0x7b, 0x14, 0x55, 0xac, 0xc3, 0x4a, 0xf2, 0xb2, 0xd9, - 0xe2, 0x0d, 0xb8, 0xa1, 0x28, 0x9e, 0x00, 0x0a, 0x9e, 0x60, 0x1f, 0x81, 0x0b, 0x2e, 0x79, 0x04, - 0x6a, 0xb8, 0xd9, 0xc7, 0xd8, 0xea, 0x56, 0xeb, 0x64, 0x29, 0x35, 0x53, 0x73, 0xa7, 0xff, 0xdc, - 0xff, 0xdf, 0x5f, 0x7f, 0xdd, 0x36, 0xfc, 0x42, 0x37, 0xfc, 0xdb, 0xe5, 0xec, 0x40, 0xb3, 0xcd, - 0x9e, 0x79, 0x34, 0x9f, 0xf5, 0xcc, 0xa3, 0x9e, 0xe7, 0x6a, 0xbd, 0xcf, 0x96, 0xd4, 0xbd, 0xef, - 0xe9, 0xd4, 0xa2, 0x2e, 0xf1, 0xe9, 0xbc, 0xe7, 0xb8, 0xb6, 0x6f, 0xf7, 0x5c, 0x47, 0x73, 0x66, - 0x81, 0xed, 0x80, 0x6b, 0x90, 0xec, 0x3a, 0xda, 0xce, 0xf0, 0x81, 0x24, 0x26, 0xf5, 0x5d, 0x43, - 0xf3, 0x32, 0x69, 0x1c, 0x7b, 0x61, 0x68, 0xf7, 0xce, 0x4c, 0x7c, 0x04, 0xa9, 0x94, 0x75, 0x68, - 0x9e, 0x51, 0xb2, 0xf0, 0x6f, 0x31, 0xfd, 0x6c, 0x49, 0x3d, 0x5f, 0x79, 0x05, 0xad, 0x50, 0xe1, - 0x39, 0xb6, 0xe5, 0x51, 0xf4, 0x3e, 0xb4, 0x96, 0x8e, 0x6f, 0x98, 0x74, 0xb8, 0x74, 0x89, 0x6f, - 0xd8, 0x56, 0x47, 0xea, 0x4a, 0x7b, 0x35, 0xbc, 0xa2, 0x45, 0x4f, 0x60, 0x23, 0xd0, 0x8c, 0x89, - 0x65, 0x7b, 0x54, 0xb3, 0xad, 0xb9, 0xd7, 0x29, 0x76, 0xa5, 0x3d, 0x19, 0x67, 0x0d, 0xca, 0x3f, - 0x25, 0x68, 0x3c, 0xa7, 0xbe, 0x16, 0x16, 0x46, 0x5b, 0x50, 0xf2, 0x7c, 0xe2, 0xfa, 0x3c, 0xbb, - 0x8c, 0x03, 0x01, 0xb5, 0x41, 0xa6, 0xd6, 0x5c, 0xa4, 0x61, 0x9f, 0xe8, 0x19, 0xd4, 0x7d, 0xa2, - 0x5f, 0x12, 0x5f, 0xbb, 0xa5, 0xae, 0xd7, 0x91, 0xbb, 0xd2, 0x5e, 0xbd, 0xdf, 0x3e, 0x70, 0x1d, - 0xed, 0x40, 0x8d, 0xf5, 0x67, 0x05, 0x9c, 0x74, 0x43, 0x1f, 0x40, 0xc5, 0x76, 0xd8, 0x32, 0xbd, - 0xce, 0x1a, 0x8f, 0xd8, 0xe0, 0x11, 0x7c, 0x05, 0x93, 0xc0, 0x80, 0x43, 0x8f, 0x63, 0x80, 0xaa, - 0x29, 0x02, 0x95, 0x5f, 0x41, 0x3d, 0x91, 0x16, 0x3d, 0x4d, 0x57, 0x97, 0xba, 0xf2, 0x5e, 0xbd, - 0xbf, 0xbe, 0x52, 0x3d, 0x55, 0x5a, 0xf9, 0x1d, 0x40, 0x6c, 0x42, 0x08, 0xd6, 0x2c, 0x62, 0x52, - 0xde, 0x65, 0x03, 0xf3, 0x6f, 0xd6, 0xfa, 0xe7, 0x64, 0xb1, 0xa4, 0xbc, 0xcd, 0x06, 0x0e, 0x04, - 0xf4, 0x7d, 0x58, 0xf3, 0xef, 0x1d, 0xca, 0x3b, 0x6c, 0x89, 0x0e, 0x45, 0x16, 0xf5, 0xde, 0xa1, - 0x98, 0x5b, 0x95, 0xaf, 0x8b, 0x62, 0x8e, 0xa2, 0x0b, 0x96, 0x6c, 0x61, 0x98, 0x46, 0x34, 0x47, - 0x2e, 0xa0, 0x0f, 0xa1, 0xea, 0x52, 0x8f, 0x21, 0xc3, 0xe7, 0x55, 0xea, 0xfd, 0x6d, 0x9e, 0x10, - 0x0b, 0xe5, 0xa7, 0x0c, 0x5e, 0xe1, 0x20, 0x22, 0x57, 0xb4, 0x0f, 0xed, 0x85, 0x6d, 0xdf, 0xcd, - 0x88, 0x76, 0x17, 0xed, 0xbe, 0xcc, 0xf3, 0x66, 0xf4, 0xe8, 0x43, 0x68, 0x2c, 0x2d, 0xa2, 0xeb, - 0x2e, 0xd5, 0x19, 0xec, 0xf8, 0x9c, 0x5b, 0xe1, 0x9c, 0x89, 0x65, 0x2f, 0xfd, 0x20, 0x3f, 0x4e, - 0xb9, 0xa1, 0xa7, 0x00, 0x89, 0xa0, 0xd2, 0x43, 0x41, 0x09, 0x27, 0x74, 0x02, 0x9b, 0xb1, 0xc4, - 0xec, 0xa6, 0xf1, 0x25, 0x9d, 0x77, 0xca, 0x0f, 0xc5, 0xe6, 0x79, 0xa3, 0x43, 0xd8, 0x30, 0x2c, - 0x6d, 0xb1, 0x9c, 0x53, 0x4c, 0x3d, 0x7b, 0xb1, 0xe4, 0xbd, 0x55, 0xba, 0xd2, 0x5e, 0xf5, 0xb8, - 0xd8, 0x91, 0x70, 0xd6, 0xa8, 0xfc, 0x5d, 0x82, 0xad, 0xbc, 0x79, 0xa1, 0x21, 0x6c, 0xb8, 0x49, - 0xbd, 0x1a, 0x6e, 0x5b, 0xbd, 0xff, 0x5e, 0x76, 0xca, 0x7c, 0xf3, 0xb2, 0x01, 0xd9, 0x2c, 0x44, - 0x0f, 0xc1, 0x9a, 0x97, 0x85, 0xe8, 0x1e, 0xce, 0x06, 0x28, 0x7f, 0x93, 0x60, 0x23, 0x53, 0x0e, - 0xf5, 0xa1, 0x2e, 0x78, 0x81, 0xaf, 0x4d, 0x4a, 0x42, 0x2a, 0xd6, 0xe3, 0xa4, 0x13, 0xfa, 0x04, - 0xb6, 0x84, 0x38, 0xf5, 0x6d, 0x97, 0xe8, 0xf4, 0x8a, 0x13, 0x87, 0x80, 0xcf, 0xa3, 0x83, 0x90, - 0x50, 0x0e, 0x52, 0x66, 0x9c, 0x1b, 0xa4, 0xbc, 0x58, 0x5d, 0x15, 0xd1, 0x3d, 0xf4, 0x24, 0x01, - 0x4a, 0x29, 0xff, 0x1c, 0x27, 0xb0, 0xc8, 0x09, 0xc2, 0x35, 0x9c, 0x4e, 0xb1, 0x2b, 0xb3, 0x53, - 0xc2, 0x05, 0xe5, 0xf7, 0xd0, 0x14, 0x34, 0x22, 0xe8, 0xea, 0x7b, 0x50, 0xf6, 0xa8, 0x6b, 0xd0, - 0xf0, 0x70, 0xd6, 0x79, 0xca, 0x29, 0x57, 0x61, 0x61, 0x42, 0x3f, 0x84, 0x35, 0x93, 0xfa, 0x44, - 0xf4, 0xb2, 0x19, 0x8e, 0x77, 0xb9, 0xf0, 0x2f, 0xa9, 0x4f, 0xe6, 0xc4, 0x27, 0x98, 0x3b, 0x28, - 0x5f, 0x49, 0x50, 0x9e, 0xa6, 0x63, 0xa4, 0x44, 0x4c, 0x60, 0x4a, 0xc7, 0xa0, 0x9f, 0x43, 0x63, - 0x4e, 0x35, 0xdb, 0x74, 0x5c, 0xea, 0x79, 0x74, 0x1e, 0x0d, 0x8c, 0x05, 0x0c, 0x13, 0x86, 0x20, - 0xf8, 0xac, 0x80, 0x53, 0xee, 0xe8, 0x63, 0x80, 0x44, 0xb0, 0x9c, 0x08, 0xbe, 0x3c, 0x3a, 0xc9, - 0x06, 0x27, 0x9c, 0x8f, 0x2b, 0x82, 0x48, 0x94, 0x97, 0xd0, 0x4a, 0x2f, 0x0d, 0xb5, 0xa0, 0x68, - 0xcc, 0x05, 0xeb, 0x14, 0x8d, 0x39, 0x7a, 0x0c, 0x35, 0xce, 0xb0, 0xaa, 0x61, 0x52, 0x41, 0xaf, - 0xb1, 0x02, 0x75, 0xa0, 0x42, 0xad, 0x39, 0xb7, 0x05, 0xc7, 0x3d, 0x14, 0x95, 0x19, 0xa0, 0x6c, - 0x0f, 0xe8, 0x00, 0x80, 0x55, 0x71, 0x6c, 0xc3, 0xf2, 0xc3, 0xc1, 0xb7, 0x82, 0x86, 0x43, 0x35, - 0x4e, 0x78, 0xa0, 0xc7, 0xb0, 0xe6, 0x33, 0x78, 0x17, 0xb9, 0x67, 0x35, 0xdc, 0x75, 0xcc, 0xb5, - 0xca, 0x2f, 0xa1, 0x16, 0x85, 0xb1, 0x85, 0xb2, 0xbb, 0xc3, 0xf3, 0x89, 0xe9, 0x08, 0x4e, 0x8b, - 0x15, 0x69, 0xea, 0x94, 0x04, 0x75, 0x2a, 0x3d, 0x90, 0x55, 0xa2, 0xbf, 0x3d, 0xd7, 0x2a, 0x5f, - 0x00, 0xca, 0x0e, 0x97, 0xdd, 0x7c, 0x71, 0xa7, 0xfc, 0x38, 0x06, 0x99, 0x56, 0xb4, 0xe8, 0x67, - 0x0c, 0xc7, 0xce, 0xc2, 0xd0, 0x48, 0xd8, 0xd1, 0x6e, 0x66, 0xbf, 0x7e, 0xcd, 0xea, 0x78, 0x38, - 0x70, 0xc3, 0x91, 0xbf, 0x72, 0x06, 0xdb, 0x0f, 0xba, 0xa1, 0x0f, 0xa0, 0xea, 0x51, 0xdd, 0xa4, - 0xf1, 0x50, 0xd7, 0x45, 0xe2, 0xa9, 0x50, 0xe3, 0xc8, 0x41, 0xf9, 0x03, 0x40, 0xac, 0x47, 0xef, - 0x43, 0xd9, 0xa4, 0xae, 0x4e, 0xe7, 0x02, 0xaf, 0xad, 0x74, 0x20, 0x16, 0x56, 0xb4, 0x0f, 0xd5, - 0xa5, 0x25, 0x3c, 0x8b, 0x89, 0x7d, 0x8b, 0x3d, 0x23, 0xbb, 0xf2, 0x67, 0x09, 0x6a, 0x91, 0x9e, - 0x4d, 0xf7, 0x96, 0x92, 0x10, 0x53, 0xfc, 0x9b, 0xe9, 0x7c, 0x62, 0x2c, 0xc4, 0x70, 0xf9, 0x77, - 0x1a, 0x69, 0xf2, 0x2a, 0xd2, 0x1e, 0x43, 0x6d, 0xb6, 0xb0, 0xb5, 0xbb, 0xa9, 0xf1, 0x25, 0xe5, - 0x6c, 0x27, 0xe3, 0x58, 0x81, 0x76, 0xa0, 0xaa, 0xdd, 0x52, 0xed, 0xce, 0x5b, 0x9a, 0xfc, 0x6a, - 0x68, 0xe2, 0x48, 0x56, 0xfe, 0x25, 0x41, 0x73, 0x4a, 0x89, 0x1b, 0x3f, 0x21, 0x9e, 0xad, 0x5e, - 0xce, 0x6f, 0xf5, 0x34, 0x88, 0x1e, 0x1e, 0xc5, 0x9c, 0x87, 0x87, 0x1c, 0x3f, 0x3c, 0xde, 0xf9, - 0x09, 0x71, 0x0a, 0xcd, 0xcb, 0x23, 0x95, 0xe8, 0x57, 0xae, 0xed, 0x50, 0xd7, 0xbf, 0xcf, 0x9c, - 0xc5, 0x2c, 0xce, 0x8a, 0x79, 0x38, 0x53, 0x46, 0xb0, 0x9e, 0x4c, 0xc4, 0x20, 0xda, 0x07, 0x70, - 0x22, 0x49, 0x60, 0x04, 0x89, 0x0d, 0x4c, 0x94, 0xc4, 0x09, 0x2f, 0xe5, 0x23, 0xfe, 0xa4, 0x89, - 0x56, 0xd3, 0x06, 0xf9, 0x8e, 0xde, 0x8b, 0xe5, 0xb0, 0x4f, 0xf4, 0x1e, 0x94, 0xf9, 0xb1, 0x08, - 0xd7, 0x21, 0x24, 0x65, 0x00, 0xcd, 0x74, 0xf5, 0xc3, 0x9c, 0xea, 0xd1, 0xbc, 0x73, 0x6b, 0x7f, - 0x25, 0x31, 0x66, 0x0a, 0x36, 0x4d, 0x10, 0xf6, 0x4f, 0x57, 0xe8, 0x32, 0xd8, 0x36, 0xb4, 0x92, - 0x26, 0x8f, 0x29, 0x7f, 0x92, 0x62, 0xca, 0x80, 0x66, 0xb7, 0x32, 0xcd, 0x67, 0x68, 0x32, 0x62, - 0x72, 0xf9, 0x0d, 0xec, 0x1f, 0xf3, 0xe9, 0xbf, 0x25, 0xd8, 0x61, 0x87, 0x74, 0x41, 0x7d, 0xca, - 0x6f, 0xde, 0x00, 0x71, 0xe1, 0x03, 0xe0, 0x47, 0xe2, 0xa9, 0x16, 0xdc, 0xab, 0xdf, 0xe2, 0x09, - 0x93, 0xee, 0xf1, 0x7b, 0x8d, 0xed, 0xf5, 0x2b, 0x63, 0xe1, 0x53, 0x77, 0x4c, 0x4c, 0xaa, 0x86, - 0x1c, 0xd8, 0xc0, 0x2b, 0xda, 0x18, 0x95, 0x72, 0x0e, 0x2a, 0xd7, 0x72, 0x51, 0x59, 0x7a, 0x13, - 0x2a, 0x95, 0xbf, 0x4a, 0xb0, 0x99, 0xd3, 0xc6, 0x3b, 0x1e, 0x9c, 0x8f, 0xe3, 0xd2, 0xc1, 0xec, - 0xbf, 0x9b, 0x69, 0x3c, 0x3d, 0xa7, 0xfc, 0xe3, 0xd1, 0x85, 0xaa, 0x4a, 0x74, 0xd6, 0x38, 0xef, - 0x9a, 0xb1, 0x74, 0x80, 0xa5, 0x06, 0x0e, 0x04, 0xe5, 0x19, 0xf7, 0xe0, 0xd4, 0xf8, 0x06, 0xb4, - 0xca, 0x09, 0xb4, 0xf6, 0xa1, 0x16, 0x46, 0x79, 0xe8, 0x07, 0x91, 0x53, 0x80, 0xd2, 0x66, 0xd8, - 0x1c, 0xb7, 0x47, 0x31, 0xff, 0x90, 0x60, 0x2b, 0xbd, 0x7e, 0x01, 0xd2, 0x7d, 0xa8, 0xcc, 0xe9, - 0x2b, 0xb2, 0x5c, 0xf8, 0x29, 0x3e, 0x8d, 0x0a, 0x9c, 0x15, 0x70, 0xe8, 0x80, 0x7e, 0x0c, 0x35, - 0xbe, 0xee, 0x89, 0xb5, 0x08, 0x5f, 0x4b, 0x51, 0x39, 0xde, 0xe6, 0x59, 0x01, 0xc7, 0x1e, 0xef, - 0x80, 0xc6, 0x3f, 0x41, 0x2b, 0xed, 0x80, 0x76, 0x01, 0xe8, 0x17, 0xb7, 0x64, 0xe9, 0xf9, 0xc6, - 0xe7, 0x01, 0x0c, 0xab, 0x38, 0xa1, 0x41, 0x7b, 0x50, 0xfd, 0x23, 0x71, 0x2d, 0xc3, 0x8a, 0xee, - 0xdc, 0x06, 0xaf, 0xf3, 0x22, 0x50, 0xe2, 0xc8, 0x8a, 0xba, 0x50, 0x77, 0xa3, 0x27, 0x2f, 0xfb, - 0x79, 0x25, 0xef, 0xc9, 0x38, 0xa9, 0x52, 0x3e, 0x82, 0x8a, 0x08, 0xcb, 0xbd, 0x60, 0x3b, 0x50, - 0x31, 0xa9, 0xe7, 0x11, 0x3d, 0xbc, 0x62, 0x43, 0x71, 0x9f, 0x42, 0x3d, 0xf1, 0xfb, 0x05, 0xd5, - 0xa0, 0x34, 0xfa, 0xf4, 0x7a, 0x70, 0xd1, 0x2e, 0xa0, 0x06, 0x54, 0xc7, 0x13, 0x35, 0x90, 0x24, - 0x04, 0x50, 0xc6, 0xa3, 0xd3, 0xd1, 0xcb, 0xab, 0x76, 0x11, 0x35, 0xa1, 0x36, 0x9e, 0xa8, 0x42, - 0x94, 0x99, 0x69, 0xf4, 0xf2, 0x7c, 0xaa, 0x4e, 0xdb, 0x6b, 0xc2, 0x24, 0xc4, 0x12, 0xaa, 0x80, - 0x3c, 0xb8, 0xb8, 0x68, 0x97, 0xf7, 0x35, 0xa8, 0x27, 0xde, 0xb4, 0xa8, 0x03, 0x5b, 0xd7, 0xe3, - 0x4f, 0xc6, 0x93, 0x17, 0xe3, 0x9b, 0xcb, 0x91, 0x8a, 0xcf, 0x4f, 0xa6, 0x37, 0xea, 0x6f, 0xae, - 0x46, 0xed, 0x02, 0xfa, 0x0e, 0x6c, 0x5f, 0x8f, 0x07, 0xa7, 0xa7, 0x78, 0x74, 0x3a, 0x50, 0x47, - 0xc3, 0xb4, 0x59, 0x42, 0xdf, 0x86, 0x47, 0x0f, 0x19, 0x8b, 0xfb, 0xe7, 0xd0, 0x48, 0xfe, 0xc4, - 0x40, 0x08, 0x5a, 0xc3, 0xd1, 0xf3, 0xc1, 0xf5, 0x85, 0x7a, 0x33, 0xb9, 0x52, 0xcf, 0x27, 0xe3, - 0x76, 0x01, 0x6d, 0x40, 0xf3, 0xf9, 0x04, 0x9f, 0x8c, 0x6e, 0x46, 0xe3, 0xc1, 0xf1, 0xc5, 0x68, - 0xd8, 0x96, 0x98, 0x5b, 0xa0, 0x1a, 0x9e, 0x4f, 0x03, 0x5d, 0x71, 0xff, 0x09, 0xb4, 0x57, 0xb9, - 0x02, 0xd5, 0xa1, 0x22, 0xd2, 0xb5, 0x0b, 0x4c, 0x50, 0x07, 0xa7, 0xe3, 0xc1, 0xe5, 0xa8, 0x2d, - 0xf5, 0xbf, 0x96, 0xa0, 0xc4, 0x5f, 0xd0, 0xe8, 0x29, 0x94, 0x83, 0x5f, 0xea, 0x28, 0xe0, 0xca, - 0xd4, 0xef, 0xf8, 0x9d, 0xcd, 0x94, 0x4e, 0xa0, 0xf8, 0x10, 0x4a, 0x9c, 0x18, 0x50, 0x82, 0x24, - 0xc2, 0x00, 0x94, 0x54, 0x05, 0xfe, 0x87, 0x12, 0x3a, 0x62, 0xcf, 0x5f, 0x46, 0xd7, 0xa2, 0x48, - 0xea, 0xc2, 0xdd, 0xd9, 0x4c, 0xe9, 0xa2, 0xa0, 0x11, 0x34, 0x92, 0x1d, 0xa1, 0xce, 0x43, 0xbc, - 0xb0, 0xb3, 0x9d, 0x63, 0x09, 0xd3, 0x1c, 0x3f, 0xfa, 0xcf, 0xeb, 0x5d, 0xe9, 0xbf, 0xaf, 0x77, - 0xa5, 0xff, 0xbd, 0xde, 0x95, 0xfe, 0xf2, 0xff, 0xdd, 0xc2, 0x6f, 0x4b, 0xfc, 0xbf, 0x90, 0x59, - 0x99, 0xff, 0x77, 0x71, 0xf4, 0x4d, 0x00, 0x00, 0x00, 0xff, 0xff, 0x48, 0x65, 0x25, 0x55, 0x48, - 0x11, 0x00, 0x00, + // 1679 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x58, 0xdb, 0x72, 0x1b, 0x49, + 0x19, 0xd6, 0x68, 0xac, 0xd3, 0xaf, 0x43, 0xe4, 0xb6, 0xd9, 0x28, 0x26, 0x18, 0xd5, 0x00, 0x8b, + 0xf1, 0x06, 0x2b, 0x91, 0xb3, 0x2c, 0x4b, 0x15, 0x07, 0x39, 0x52, 0x6c, 0xd7, 0xda, 0x92, 0xb7, + 0x35, 0x26, 0x81, 0x82, 0x32, 0xad, 0x51, 0x67, 0x3c, 0x65, 0xcd, 0x61, 0xe7, 0xb0, 0xac, 0xb7, + 0xb8, 0xe0, 0x9e, 0x1b, 0x8a, 0xe2, 0x09, 0xa0, 0xe0, 0x09, 0xf6, 0x11, 0xb8, 0xe0, 0x92, 0x47, + 0xa0, 0xc2, 0x0d, 0x8f, 0xb1, 0xd5, 0x3d, 0x3d, 0x27, 0xcd, 0xb8, 0x92, 0xca, 0xdd, 0xfc, 0xe7, + 0xfe, 0xff, 0xfe, 0xfa, 0xeb, 0x96, 0xe0, 0x67, 0xba, 0xe1, 0x5f, 0x07, 0x8b, 0x03, 0xcd, 0x36, + 0x07, 0xe6, 0xe1, 0x72, 0x31, 0x30, 0x0f, 0x07, 0x9e, 0xab, 0x0d, 0x3e, 0x0b, 0xa8, 0x7b, 0x3b, + 0xd0, 0xa9, 0x45, 0x5d, 0xe2, 0xd3, 0xe5, 0xc0, 0x71, 0x6d, 0xdf, 0x1e, 0xb8, 0x8e, 0xe6, 0x2c, + 0x42, 0xdb, 0x01, 0xd7, 0x20, 0xd9, 0x75, 0xb4, 0x9d, 0xf1, 0x1d, 0x49, 0x4c, 0xea, 0xbb, 0x86, + 0xe6, 0xe5, 0xd2, 0x38, 0xf6, 0xca, 0xd0, 0x6e, 0x9d, 0x85, 0xf8, 0x08, 0x53, 0x29, 0xf7, 0xa0, + 0x7d, 0x42, 0xc9, 0xca, 0xbf, 0xc6, 0xf4, 0xb3, 0x80, 0x7a, 0xbe, 0xf2, 0x0a, 0x3a, 0x91, 0xc2, + 0x73, 0x6c, 0xcb, 0xa3, 0xe8, 0x7d, 0xe8, 0x04, 0x8e, 0x6f, 0x98, 0x74, 0x1c, 0xb8, 0xc4, 0x37, + 0x6c, 0xab, 0x27, 0xf5, 0xa5, 0xbd, 0x06, 0x5e, 0xd3, 0xa2, 0x47, 0xb0, 0x19, 0x6a, 0xa6, 0xc4, + 0xb2, 0x3d, 0xaa, 0xd9, 0xd6, 0xd2, 0xeb, 0x95, 0xfb, 0xd2, 0x9e, 0x8c, 0xf3, 0x06, 0xe5, 0x1f, + 0x12, 0xb4, 0x9e, 0x53, 0x5f, 0x8b, 0x0a, 0xa3, 0x6d, 0xa8, 0x78, 0x3e, 0x71, 0x7d, 0x9e, 0x5d, + 0xc6, 0xa1, 0x80, 0xba, 0x20, 0x53, 0x6b, 0x29, 0xd2, 0xb0, 0x4f, 0xf4, 0x14, 0x9a, 0x3e, 0xd1, + 0xcf, 0x89, 0xaf, 0x5d, 0x53, 0xd7, 0xeb, 0xc9, 0x7d, 0x69, 0xaf, 0x39, 0xec, 0x1e, 0xb8, 0x8e, + 0x76, 0xa0, 0x26, 0xfa, 0x93, 0x12, 0x4e, 0xbb, 0xa1, 0x0f, 0xa0, 0x66, 0x3b, 0x6c, 0x99, 0x5e, + 0x6f, 0x83, 0x47, 0x6c, 0xf2, 0x08, 0xbe, 0x82, 0x59, 0x68, 0xc0, 0x91, 0xc7, 0x11, 0x40, 0xdd, + 0x14, 0x81, 0xca, 0x2f, 0xa0, 0x99, 0x4a, 0x8b, 0x9e, 0x64, 0xab, 0x4b, 0x7d, 0x79, 0xaf, 0x39, + 0xbc, 0xb7, 0x56, 0x3d, 0x53, 0x5a, 0xf9, 0x0d, 0x40, 0x62, 0x42, 0x08, 0x36, 0x2c, 0x62, 0x52, + 0xde, 0x65, 0x0b, 0xf3, 0x6f, 0xd6, 0xfa, 0xe7, 0x64, 0x15, 0x50, 0xde, 0x66, 0x0b, 0x87, 0x02, + 0xfa, 0x2e, 0x6c, 0xf8, 0xb7, 0x0e, 0xe5, 0x1d, 0x76, 0x44, 0x87, 0x22, 0x8b, 0x7a, 0xeb, 0x50, + 0xcc, 0xad, 0xca, 0x1f, 0x65, 0x31, 0x47, 0xd1, 0x05, 0x4b, 0xb6, 0x32, 0x4c, 0x23, 0x9e, 0x23, + 0x17, 0xd0, 0x87, 0x50, 0x77, 0xa9, 0xc7, 0x90, 0xe1, 0xf3, 0x2a, 0xcd, 0xe1, 0x03, 0x9e, 0x10, + 0x0b, 0xe5, 0xa7, 0x0c, 0x5e, 0xd1, 0x20, 0x62, 0x57, 0xb4, 0x0f, 0xdd, 0x95, 0x6d, 0xdf, 0x2c, + 0x88, 0x76, 0x13, 0xef, 0xbe, 0xcc, 0xf3, 0xe6, 0xf4, 0xe8, 0x43, 0x68, 0x05, 0x16, 0xd1, 0x75, + 0x97, 0xea, 0x0c, 0x76, 0x7c, 0xce, 0x9d, 0x68, 0xce, 0xc4, 0xb2, 0x03, 0x3f, 0xcc, 0x8f, 0x33, + 0x6e, 0xe8, 0x09, 0x40, 0x2a, 0xa8, 0x72, 0x57, 0x50, 0xca, 0x09, 0x3d, 0x83, 0xad, 0x44, 0x62, + 0x76, 0xd3, 0xf8, 0x92, 0x2e, 0x7b, 0xd5, 0xbb, 0x62, 0x8b, 0xbc, 0xd1, 0x63, 0xd8, 0x34, 0x2c, + 0x6d, 0x15, 0x2c, 0x29, 0xa6, 0x9e, 0xbd, 0x0a, 0x78, 0x6f, 0xb5, 0xbe, 0xb4, 0x57, 0x3f, 0x2a, + 0xf7, 0x24, 0x9c, 0x37, 0xa2, 0xf7, 0xa0, 0xea, 0xd9, 0x81, 0xab, 0xd1, 0x5e, 0x9d, 0xef, 0x93, + 0x90, 0x94, 0xbf, 0x49, 0xb0, 0x5d, 0x34, 0x47, 0x34, 0x86, 0x4d, 0x37, 0xad, 0x57, 0xa3, 0xed, + 0x6c, 0x0e, 0xdf, 0xcb, 0x4f, 0x9f, 0x6f, 0x6a, 0x3e, 0x20, 0x9f, 0x85, 0xe8, 0x11, 0x88, 0x8b, + 0xb2, 0x10, 0xdd, 0xc3, 0xf9, 0x00, 0xe5, 0xaf, 0x12, 0x6c, 0xe6, 0xca, 0xa1, 0x21, 0x34, 0x05, + 0x5f, 0xf0, 0xb5, 0x49, 0x69, 0xa8, 0x25, 0x7a, 0x9c, 0x76, 0x42, 0x9f, 0xc0, 0xb6, 0x10, 0xe7, + 0xbe, 0xed, 0x12, 0x9d, 0x5e, 0x70, 0x42, 0x11, 0xb0, 0xba, 0x7f, 0x10, 0x11, 0xcd, 0x41, 0xc6, + 0x8c, 0x0b, 0x83, 0x94, 0x17, 0xeb, 0xab, 0x22, 0xba, 0x87, 0x1e, 0xa5, 0xc0, 0x2a, 0x15, 0x9f, + 0xef, 0x14, 0x46, 0x39, 0x71, 0xb8, 0x86, 0xd3, 0x2b, 0xf7, 0x65, 0x76, 0x7a, 0xb8, 0xa0, 0xfc, + 0x16, 0xda, 0x82, 0x5e, 0x04, 0x8d, 0x7d, 0x07, 0xaa, 0x1e, 0x75, 0x0d, 0x1a, 0x1d, 0xda, 0x26, + 0x4f, 0x39, 0xe7, 0x2a, 0x2c, 0x4c, 0xe8, 0xfb, 0xb0, 0x61, 0x52, 0x9f, 0x88, 0x5e, 0xb6, 0xa2, + 0xf1, 0x06, 0x2b, 0xff, 0x9c, 0xfa, 0x64, 0x49, 0x7c, 0x82, 0xb9, 0x83, 0xf2, 0x95, 0x04, 0xd5, + 0x79, 0x36, 0x46, 0x4a, 0xc5, 0x84, 0xa6, 0x6c, 0x0c, 0xfa, 0x29, 0xb4, 0x96, 0x54, 0xb3, 0x4d, + 0xc7, 0xa5, 0x9e, 0x47, 0x97, 0xf1, 0xc0, 0x58, 0xc0, 0x38, 0x65, 0x08, 0x83, 0x4f, 0x4a, 0x38, + 0xe3, 0x8e, 0x3e, 0x06, 0x48, 0x05, 0xcb, 0xa9, 0xe0, 0xf3, 0xc3, 0x67, 0xf9, 0xe0, 0x94, 0xf3, + 0x51, 0x4d, 0x10, 0x8c, 0xf2, 0x12, 0x3a, 0xd9, 0xa5, 0xa1, 0x0e, 0x94, 0x8d, 0xa5, 0x60, 0xa3, + 0xb2, 0xb1, 0x44, 0x0f, 0xa1, 0xc1, 0x99, 0x57, 0x35, 0x4c, 0x2a, 0x68, 0x37, 0x51, 0xa0, 0x1e, + 0xd4, 0xa8, 0xb5, 0xe4, 0xb6, 0x90, 0x06, 0x22, 0x51, 0x59, 0x00, 0xca, 0xf7, 0x80, 0x0e, 0x00, + 0x58, 0x15, 0xc7, 0x36, 0x2c, 0x3f, 0x1a, 0x7c, 0x27, 0x6c, 0x38, 0x52, 0xe3, 0x94, 0x07, 0x7a, + 0x08, 0x1b, 0x3e, 0x83, 0x77, 0x99, 0x7b, 0xd6, 0xa3, 0x5d, 0xc7, 0x5c, 0xab, 0xfc, 0x1c, 0x1a, + 0x71, 0x18, 0x5b, 0x28, 0xbb, 0x53, 0x3c, 0x9f, 0x98, 0x8e, 0xe0, 0xba, 0x44, 0x91, 0xa5, 0x54, + 0x49, 0x50, 0xaa, 0x32, 0x00, 0x59, 0x25, 0xfa, 0xdb, 0x73, 0xb0, 0xf2, 0x05, 0xa0, 0xfc, 0x70, + 0xd9, 0x8d, 0x98, 0x74, 0xca, 0x8f, 0x63, 0x98, 0x69, 0x4d, 0x8b, 0x7e, 0xc2, 0x70, 0xec, 0xac, + 0x0c, 0x8d, 0x44, 0x1d, 0xed, 0xe6, 0xf6, 0xeb, 0x97, 0xac, 0x8e, 0x87, 0x43, 0x37, 0x1c, 0xfb, + 0x2b, 0x27, 0xf0, 0xe0, 0x4e, 0x37, 0xf4, 0x01, 0xd4, 0x3d, 0xaa, 0x9b, 0x34, 0x19, 0xea, 0x3d, + 0x91, 0x78, 0x2e, 0xd4, 0x38, 0x76, 0x50, 0x7e, 0x07, 0x90, 0xe8, 0xd1, 0xfb, 0x50, 0x35, 0xa9, + 0xab, 0xd3, 0xa5, 0xc0, 0x6b, 0x27, 0x1b, 0x88, 0x85, 0x15, 0xed, 0x43, 0x3d, 0xb0, 0x84, 0x67, + 0x39, 0xb5, 0x6f, 0x89, 0x67, 0x6c, 0x57, 0xfe, 0x24, 0x41, 0x23, 0xd6, 0xb3, 0xe9, 0x5e, 0x53, + 0x12, 0x61, 0x8a, 0x7f, 0x33, 0x9d, 0x4f, 0x8c, 0x95, 0x18, 0x2e, 0xff, 0xce, 0x22, 0x4d, 0x5e, + 0x47, 0xda, 0x43, 0x68, 0x2c, 0x56, 0xb6, 0x76, 0x33, 0x37, 0xbe, 0xa4, 0x9c, 0xed, 0x64, 0x9c, + 0x28, 0xd0, 0x0e, 0xd4, 0xb5, 0x6b, 0xaa, 0xdd, 0x78, 0x81, 0xc9, 0xaf, 0x8c, 0x36, 0x8e, 0x65, + 0xe5, 0x9f, 0x12, 0xb4, 0xe7, 0x94, 0xb8, 0xc9, 0xd3, 0xe2, 0xe9, 0xfa, 0xa5, 0xfd, 0x56, 0x4f, + 0x86, 0xf8, 0x41, 0x52, 0x2e, 0x78, 0x90, 0xc8, 0xc9, 0x83, 0xe4, 0x9d, 0x9f, 0x16, 0xc7, 0xd0, + 0x3e, 0x3f, 0x54, 0x89, 0x7e, 0xe1, 0xda, 0x0e, 0x75, 0xfd, 0xdb, 0xdc, 0x59, 0xcc, 0xe3, 0xac, + 0x5c, 0x84, 0x33, 0x65, 0x02, 0xf7, 0xd2, 0x89, 0x18, 0x44, 0x87, 0x00, 0x4e, 0x2c, 0x09, 0x8c, + 0x20, 0xb1, 0x81, 0xa9, 0x92, 0x38, 0xe5, 0xa5, 0x7c, 0xc4, 0x9f, 0x3a, 0xf1, 0x6a, 0xba, 0x20, + 0xdf, 0xd0, 0x5b, 0xb1, 0x1c, 0xf6, 0xc9, 0x2e, 0x40, 0x7e, 0x2c, 0xa2, 0x75, 0x08, 0x49, 0x19, + 0x41, 0x3b, 0x5b, 0xfd, 0x71, 0x41, 0xf5, 0x78, 0xde, 0x85, 0xb5, 0xbf, 0x92, 0x18, 0x33, 0x85, + 0x9b, 0x26, 0x08, 0xfb, 0xc7, 0x6b, 0x74, 0x19, 0x6e, 0x1b, 0x5a, 0x4b, 0x53, 0xc4, 0x94, 0x3f, + 0xca, 0x30, 0x65, 0x48, 0xb3, 0xdb, 0xb9, 0xe6, 0x73, 0x34, 0x19, 0x33, 0xb9, 0xfc, 0x06, 0xf6, + 0x4f, 0xf8, 0xf4, 0x5f, 0x12, 0xec, 0xb0, 0x43, 0xba, 0xa2, 0x3e, 0xe5, 0x37, 0x6f, 0x88, 0xb8, + 0xe8, 0x01, 0xf0, 0x03, 0xf1, 0x84, 0x0b, 0xef, 0xd5, 0x6f, 0xf0, 0x84, 0x69, 0xf7, 0xe4, 0x1d, + 0xc7, 0xf6, 0xfa, 0x95, 0xb1, 0xf2, 0xa9, 0x3b, 0x25, 0x26, 0x55, 0x23, 0x0e, 0x6c, 0xe1, 0x35, + 0x6d, 0x82, 0x4a, 0xb9, 0x00, 0x95, 0x1b, 0x85, 0xa8, 0xac, 0xbc, 0x09, 0x95, 0xca, 0x5f, 0x24, + 0xd8, 0x2a, 0x68, 0xe3, 0x1d, 0x0f, 0xce, 0xc7, 0x49, 0xe9, 0x70, 0xf6, 0xdf, 0xce, 0x35, 0x9e, + 0x9d, 0x53, 0xf1, 0xf1, 0xe8, 0x43, 0x5d, 0x25, 0x3a, 0x6b, 0x9c, 0x77, 0xcd, 0x58, 0x3a, 0xc4, + 0x52, 0x0b, 0x87, 0x82, 0xf2, 0x94, 0x7b, 0x70, 0x6a, 0x7c, 0x03, 0x5a, 0xe5, 0x14, 0x5a, 0x87, + 0xd0, 0x88, 0xa2, 0x3c, 0xf4, 0xbd, 0xd8, 0x29, 0x44, 0x69, 0x3b, 0x6a, 0x8e, 0xdb, 0xe3, 0x98, + 0xbf, 0x4b, 0xb0, 0x9d, 0x5d, 0xbf, 0x00, 0xe9, 0x3e, 0xd4, 0x96, 0xf4, 0x15, 0x09, 0x56, 0x7e, + 0x86, 0x4f, 0xe3, 0x02, 0x27, 0x25, 0x1c, 0x39, 0xa0, 0x1f, 0x42, 0x83, 0xaf, 0x7b, 0x66, 0xad, + 0xa2, 0xd7, 0x52, 0x5c, 0x8e, 0xb7, 0x79, 0x52, 0xc2, 0x89, 0xc7, 0x3b, 0xa0, 0xf1, 0x0f, 0xd0, + 0xc9, 0x3a, 0xa0, 0x5d, 0x00, 0xfa, 0xc5, 0x35, 0x09, 0x3c, 0xdf, 0xf8, 0x3c, 0x84, 0x61, 0x1d, + 0xa7, 0x34, 0x68, 0x0f, 0xea, 0xbf, 0x27, 0xae, 0x65, 0x58, 0xf1, 0x9d, 0xdb, 0xe2, 0x75, 0x5e, + 0x84, 0x4a, 0x1c, 0x5b, 0x51, 0x1f, 0x9a, 0x6e, 0xfc, 0x14, 0x66, 0x3f, 0xbb, 0xe4, 0x3d, 0x19, + 0xa7, 0x55, 0xca, 0x47, 0x50, 0x13, 0x61, 0x85, 0x17, 0x6c, 0x0f, 0x6a, 0x26, 0xf5, 0x3c, 0xa2, + 0x47, 0x57, 0x6c, 0x24, 0xee, 0x53, 0x68, 0xa6, 0x7e, 0xd7, 0xa0, 0x06, 0x54, 0x26, 0x9f, 0x5e, + 0x8e, 0xce, 0xba, 0x25, 0xd4, 0x82, 0xfa, 0x74, 0xa6, 0x86, 0x92, 0x84, 0x00, 0xaa, 0x78, 0x72, + 0x3c, 0x79, 0x79, 0xd1, 0x2d, 0xa3, 0x36, 0x34, 0xa6, 0x33, 0x55, 0x88, 0x32, 0x33, 0x4d, 0x5e, + 0x9e, 0xce, 0xd5, 0x79, 0x77, 0x43, 0x98, 0x84, 0x58, 0x41, 0x35, 0x90, 0x47, 0x67, 0x67, 0xdd, + 0xea, 0xbe, 0x06, 0xcd, 0xd4, 0x9b, 0x16, 0xf5, 0x60, 0xfb, 0x72, 0xfa, 0xc9, 0x74, 0xf6, 0x62, + 0x7a, 0x75, 0x3e, 0x51, 0xf1, 0xe9, 0xb3, 0xf9, 0x95, 0xfa, 0xab, 0x8b, 0x49, 0xb7, 0x84, 0xbe, + 0x05, 0x0f, 0x2e, 0xa7, 0xa3, 0xe3, 0x63, 0x3c, 0x39, 0x1e, 0xa9, 0x93, 0x71, 0xd6, 0x2c, 0xa1, + 0x6f, 0xc2, 0xfd, 0xbb, 0x8c, 0xe5, 0xfd, 0x53, 0x68, 0xa5, 0x7f, 0x7a, 0x20, 0x04, 0x9d, 0xf1, + 0xe4, 0xf9, 0xe8, 0xf2, 0x4c, 0xbd, 0x9a, 0x5d, 0xa8, 0xa7, 0xb3, 0x69, 0xb7, 0x84, 0x36, 0xa1, + 0xfd, 0x7c, 0x86, 0x9f, 0x4d, 0xae, 0x26, 0xd3, 0xd1, 0xd1, 0xd9, 0x64, 0xdc, 0x95, 0x98, 0x5b, + 0xa8, 0x1a, 0x9f, 0xce, 0x43, 0x5d, 0x79, 0xff, 0x11, 0x74, 0xd7, 0xb9, 0x02, 0x35, 0xa1, 0x26, + 0xd2, 0x75, 0x4b, 0x4c, 0x50, 0x47, 0xc7, 0xd3, 0xd1, 0xf9, 0xa4, 0x2b, 0x0d, 0xff, 0x2f, 0x41, + 0x85, 0xbf, 0xa0, 0xd1, 0x13, 0xa8, 0x86, 0xbf, 0xe0, 0x51, 0xc8, 0x95, 0x99, 0xdf, 0xf7, 0x3b, + 0x5b, 0x19, 0x9d, 0x40, 0xf1, 0x63, 0xa8, 0x70, 0x62, 0x40, 0x29, 0x92, 0x88, 0x02, 0x50, 0x5a, + 0x15, 0xfa, 0x3f, 0x96, 0xd0, 0x21, 0x7b, 0xfe, 0x32, 0xba, 0x16, 0x45, 0x32, 0x17, 0xee, 0xce, + 0x56, 0x46, 0x17, 0x07, 0x4d, 0xa0, 0x95, 0xee, 0x08, 0xf5, 0xee, 0xe2, 0x85, 0x9d, 0x07, 0x05, + 0x96, 0x28, 0xcd, 0xd1, 0xfd, 0x7f, 0xbf, 0xde, 0x95, 0xfe, 0xf3, 0x7a, 0x57, 0xfa, 0xef, 0xeb, + 0x5d, 0xe9, 0xcf, 0xff, 0xdb, 0x2d, 0xfd, 0xba, 0xc2, 0xff, 0x23, 0x59, 0x54, 0xf9, 0x7f, 0x1a, + 0x87, 0x5f, 0x07, 0x00, 0x00, 0xff, 0xff, 0xf9, 0x47, 0xd8, 0x5c, 0x60, 0x11, 0x00, 0x00, } diff --git a/src/query/generated/proto/rpcpb/query.proto b/src/query/generated/proto/rpcpb/query.proto index 03abdc82d2..b18550f45b 100644 --- a/src/query/generated/proto/rpcpb/query.proto +++ b/src/query/generated/proto/rpcpb/query.proto @@ -63,6 +63,7 @@ message FetchOptions { FanoutOption aggregatedOptimized = 6; // Deprecated: all requests will include resolution. bool includeResolution = 7 [deprecated=true]; + bytes source = 8; } message RestrictQueryOptions { diff --git a/src/query/graphite/common/context.go b/src/query/graphite/common/context.go index 6e4d5ba09a..7750925acb 100644 --- a/src/query/graphite/common/context.go +++ b/src/query/graphite/common/context.go @@ -57,6 +57,9 @@ type contextBase struct { // Limit provides a cap on the number of results returned from the database. Limit int + // Source is the query source. + Source []byte + parent *Context reqCtx ctx.Context storageContext context.Context diff --git a/src/query/graphite/native/expression.go b/src/query/graphite/native/expression.go index d76e8faa95..905529fd0b 100644 --- a/src/query/graphite/native/expression.go +++ b/src/query/graphite/native/expression.go @@ -94,6 +94,7 @@ func (f *fetchExpression) Execute(ctx *common.Context) (ts.SeriesList, error) { Timeout: ctx.Timeout, Limit: ctx.Limit, }, + Source: ctx.Source, } result, err := ctx.Engine.FetchByQuery(ctx, f.pathArg.path, opts) diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index 0ba21ab747..ec4845219c 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -388,6 +388,7 @@ func (s *m3WrappedStore) FetchByQuery( defer cancel() fetchOptions := storage.NewFetchOptions() fetchOptions.SeriesLimit = fetchOpts.Limit + fetchOptions.Source = fetchOpts.Source // NB: ensure single block return. fetchOptions.BlockType = models.TypeSingleBlock diff --git a/src/query/graphite/storage/storage.go b/src/query/graphite/storage/storage.go index f641e32b56..6aa3a9fcc9 100644 --- a/src/query/graphite/storage/storage.go +++ b/src/query/graphite/storage/storage.go @@ -37,6 +37,8 @@ type FetchOptions struct { EndTime time.Time // DataOptions are the options for the fetch. DataOptions + // Source is the query source. + Source []byte } // DataOptions provide data context. diff --git a/src/query/remote/codecs.go b/src/query/remote/codecs.go index 114e1341b2..d5ef494c74 100644 --- a/src/query/remote/codecs.go +++ b/src/query/remote/codecs.go @@ -165,7 +165,8 @@ func encodeFetchOptions(options *storage.FetchOptions) (*rpc.FetchOptions, error fanoutOpts := options.FanoutOptions result := &rpc.FetchOptions{ - Limit: int64(options.SeriesLimit), + Limit: int64(options.SeriesLimit), + Source: options.Source, } unagg, err := encodeFanoutOption(fanoutOpts.FanoutUnaggregated) @@ -498,6 +499,7 @@ func decodeFetchOptions(rpcFetchOptions *rpc.FetchOptions) (*storage.FetchOption result.LookbackDuration = &duration } + result.Source = rpcFetchOptions.Source return result, nil } diff --git a/src/query/storage/index.go b/src/query/storage/index.go index a933fc3413..7315076546 100644 --- a/src/query/storage/index.go +++ b/src/query/storage/index.go @@ -73,6 +73,7 @@ func FetchOptionsToM3Options(fetchOptions *FetchOptions, fetchQuery *FetchQuery) SeriesLimit: fetchOptions.SeriesLimit, DocsLimit: fetchOptions.DocsLimit, RequireExhaustive: fetchOptions.RequireExhaustive, + Source: fetchOptions.Source, StartInclusive: fetchQuery.Start, EndExclusive: fetchQuery.End, } @@ -96,6 +97,7 @@ func FetchOptionsToAggregateOptions( QueryOptions: index.QueryOptions{ SeriesLimit: fetchOptions.SeriesLimit, DocsLimit: fetchOptions.DocsLimit, + Source: fetchOptions.Source, StartInclusive: tagQuery.Start, EndExclusive: tagQuery.End, }, diff --git a/src/query/storage/types.go b/src/query/storage/types.go index a5157155e3..4c90986e25 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -133,6 +133,8 @@ type FetchOptions struct { Scope tally.Scope // Timeout is the timeout for the request. Timeout time.Duration + // Source is the source for the query. + Source []byte } // FanoutOptions describes which namespaces should be fanned out to for diff --git a/src/x/headers/headers.go b/src/x/headers/headers.go index 74936dad32..230521b744 100644 --- a/src/x/headers/headers.go +++ b/src/x/headers/headers.go @@ -50,6 +50,9 @@ const ( // Valid values are "default" or "aggregate". WriteTypeHeader = M3HeaderPrefix + "Write-Type" + // SourceHeader tracks bytes and docs read for the given source, if provided. + SourceHeader = M3HeaderPrefix + "Source" + // DefaultWriteType is the default write type. DefaultWriteType = "default"