diff --git a/go.mod b/go.mod index 7cec7ccf03..b223fe63be 100644 --- a/go.mod +++ b/go.mod @@ -110,6 +110,7 @@ require ( golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634 golang.org/x/tools v0.0.0-20201013201025-64a9e34f3752 google.golang.org/grpc v1.29.1 + google.golang.org/protobuf v1.23.0 gopkg.in/go-ini/ini.v1 v1.57.0 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/validator.v9 v9.7.0 diff --git a/site/content/operational_guide/resource_limits.md b/site/content/operational_guide/resource_limits.md index f19268009f..3b6041388c 100644 --- a/site/content/operational_guide/resource_limits.md +++ b/site/content/operational_guide/resource_limits.md @@ -53,6 +53,15 @@ per second safely with your deployment and you want to use the default lookback of `15s` then you would multiply 10,000 by 15 to get 150,000 as a max value with a 15s lookback. +The third limit `maxRecentlyQueriedSeriesDiskRead` caps the series IDs matched by incoming +queries. This originally was distinct from the limit `maxRecentlyQueriedSeriesBlocks`, which +also limits the memory cost of specific series matched, because of an inefficiency +in how allocations would occur even for series known to not be present on disk for a given +shard. This inefficiency has been resolved https://github.com/m3db/m3/pull/3103 and therefore +this limit should be tracking memory cost linearly relative to `maxRecentlyQueriedSeriesBlocks`. +It is recommended to defer to using `maxRecentlyQueriedSeriesBlocks` over +`maxRecentlyQueriedSeriesDiskRead` given both should cap the resources similarly. + ### Annotated configuration ```yaml @@ -82,6 +91,18 @@ limits: # and read until the lookback period resets. lookback: 15s + # If set, will enforce a maximum on the series read from disk. + # This limit can be used to ensure queries that match an extremely high + # volume of series can be limited before even reading the underlying series data from disk. + maxRecentlyQueriedSeriesDiskRead: + # Value sets the maximum number of series read from disk. + value: 0 + # Lookback sets the time window that this limit is enforced over, every + # lookback period the global count is reset to zero and when the limit + # is reached it will reject any further time series blocks being matched + # and read until the lookback period resets. + lookback: 15s + # If set then will limit the number of parallel write batch requests to the # database and return errors if hit. maxOutstandingWriteRequests: 0 @@ -94,6 +115,48 @@ limits: maxOutstandingReadRequests: 0 ``` +### Dynamic configuration + +Query limits can be dynamically driven by etcd to adjust limits without redeploying. By updating the `m3db.query.limits` key in etcd, specific limits can be overriden. M3Coordinator exposes an API for updating etcd key/value pairs and so this API can be used for modifying these dynamic overrides. For example, + +``` +curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{ + "key": "m3db.query.limits", + "value":{ + "maxRecentlyQueriedSeriesDiskBytesRead": { + "limit":0, + "lookbackSeconds":15, + "forceExceeded":false + }, + "maxRecentlyQueriedSeriesBlocks": { + "limit":0, + "lookbackSeconds":15, + "forceExceeded":false + }, + "maxRecentlyQueriedSeriesDiskRead": { + "limit":0, + "lookbackSeconds":15, + "forceExceeded":false + } + }, + "commit":true +}' +``` + +To remove all overrides, omit all limits from the `value` +``` +curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/kvstore -d '{ + "key": "m3db.query.limits", + "value":{}, + "commit":true +}' +``` + +Usage notes: +- Setting the `commit` flag to false allows for dry-run API calls to see the old and new limits that would be applied. +- Omitting a limit from the `value` results in that limit to be driven by the config-based settings. +- The `forceExceeded` flag makes the limit behave as though it is permanently exceeded, thus failing all queries. This is useful for dynamically shutting down all queries in cases where load may be exceeding provisioned resources. + ## M3 Query and M3 Coordinator ### Deployment diff --git a/src/cluster/generated/proto/kvpb/kv.pb.go b/src/cluster/generated/proto/kvpb/kv.pb.go new file mode 100644 index 0000000000..d3c7c43f87 --- /dev/null +++ b/src/cluster/generated/proto/kvpb/kv.pb.go @@ -0,0 +1,1097 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: github.com/m3db/m3/src/cluster/generated/proto/kvpb/kv.proto + +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +/* + Package kvpb is a generated protocol buffer package. + + It is generated from these files: + github.com/m3db/m3/src/cluster/generated/proto/kvpb/kv.proto + + It has these top-level messages: + KeyValueUpdate + KeyValueUpdateResult + QueryLimits + QueryLimit +*/ +package kvpb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type KeyValueUpdate struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Commit bool `protobuf:"varint,3,opt,name=commit,proto3" json:"commit,omitempty"` +} + +func (m *KeyValueUpdate) Reset() { *m = KeyValueUpdate{} } +func (m *KeyValueUpdate) String() string { return proto.CompactTextString(m) } +func (*KeyValueUpdate) ProtoMessage() {} +func (*KeyValueUpdate) Descriptor() ([]byte, []int) { return fileDescriptorKv, []int{0} } + +func (m *KeyValueUpdate) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *KeyValueUpdate) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +func (m *KeyValueUpdate) GetCommit() bool { + if m != nil { + return m.Commit + } + return false +} + +type KeyValueUpdateResult struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Old string `protobuf:"bytes,2,opt,name=old,proto3" json:"old,omitempty"` + New string `protobuf:"bytes,3,opt,name=new,proto3" json:"new,omitempty"` +} + +func (m *KeyValueUpdateResult) Reset() { *m = KeyValueUpdateResult{} } +func (m *KeyValueUpdateResult) String() string { return proto.CompactTextString(m) } +func (*KeyValueUpdateResult) ProtoMessage() {} +func (*KeyValueUpdateResult) Descriptor() ([]byte, []int) { return fileDescriptorKv, []int{1} } + +func (m *KeyValueUpdateResult) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *KeyValueUpdateResult) GetOld() string { + if m != nil { + return m.Old + } + return "" +} + +func (m *KeyValueUpdateResult) GetNew() string { + if m != nil { + return m.New + } + return "" +} + +type QueryLimits struct { + MaxRecentlyQueriedSeriesBlocks *QueryLimit `protobuf:"bytes,1,opt,name=maxRecentlyQueriedSeriesBlocks" json:"maxRecentlyQueriedSeriesBlocks,omitempty"` + MaxRecentlyQueriedSeriesDiskBytesRead *QueryLimit `protobuf:"bytes,2,opt,name=maxRecentlyQueriedSeriesDiskBytesRead" json:"maxRecentlyQueriedSeriesDiskBytesRead,omitempty"` + MaxRecentlyQueriedSeriesDiskRead *QueryLimit `protobuf:"bytes,3,opt,name=maxRecentlyQueriedSeriesDiskRead" json:"maxRecentlyQueriedSeriesDiskRead,omitempty"` +} + +func (m *QueryLimits) Reset() { *m = QueryLimits{} } +func (m *QueryLimits) String() string { return proto.CompactTextString(m) } +func (*QueryLimits) ProtoMessage() {} +func (*QueryLimits) Descriptor() ([]byte, []int) { return fileDescriptorKv, []int{2} } + +func (m *QueryLimits) GetMaxRecentlyQueriedSeriesBlocks() *QueryLimit { + if m != nil { + return m.MaxRecentlyQueriedSeriesBlocks + } + return nil +} + +func (m *QueryLimits) GetMaxRecentlyQueriedSeriesDiskBytesRead() *QueryLimit { + if m != nil { + return m.MaxRecentlyQueriedSeriesDiskBytesRead + } + return nil +} + +func (m *QueryLimits) GetMaxRecentlyQueriedSeriesDiskRead() *QueryLimit { + if m != nil { + return m.MaxRecentlyQueriedSeriesDiskRead + } + return nil +} + +type QueryLimit struct { + Limit int64 `protobuf:"varint,1,opt,name=limit,proto3" json:"limit,omitempty"` + LookbackSeconds int64 `protobuf:"varint,2,opt,name=lookbackSeconds,proto3" json:"lookbackSeconds,omitempty"` + ForceExceeded bool `protobuf:"varint,3,opt,name=forceExceeded,proto3" json:"forceExceeded,omitempty"` +} + +func (m *QueryLimit) Reset() { *m = QueryLimit{} } +func (m *QueryLimit) String() string { return proto.CompactTextString(m) } +func (*QueryLimit) ProtoMessage() {} +func (*QueryLimit) Descriptor() ([]byte, []int) { return fileDescriptorKv, []int{3} } + +func (m *QueryLimit) GetLimit() int64 { + if m != nil { + return m.Limit + } + return 0 +} + +func (m *QueryLimit) GetLookbackSeconds() int64 { + if m != nil { + return m.LookbackSeconds + } + return 0 +} + +func (m *QueryLimit) GetForceExceeded() bool { + if m != nil { + return m.ForceExceeded + } + return false +} + +func init() { + proto.RegisterType((*KeyValueUpdate)(nil), "kvpb.KeyValueUpdate") + proto.RegisterType((*KeyValueUpdateResult)(nil), "kvpb.KeyValueUpdateResult") + proto.RegisterType((*QueryLimits)(nil), "kvpb.QueryLimits") + proto.RegisterType((*QueryLimit)(nil), "kvpb.QueryLimit") +} +func (m *KeyValueUpdate) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *KeyValueUpdate) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintKv(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if len(m.Value) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintKv(dAtA, i, uint64(len(m.Value))) + i += copy(dAtA[i:], m.Value) + } + if m.Commit { + dAtA[i] = 0x18 + i++ + if m.Commit { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + return i, nil +} + +func (m *KeyValueUpdateResult) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *KeyValueUpdateResult) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Key) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintKv(dAtA, i, uint64(len(m.Key))) + i += copy(dAtA[i:], m.Key) + } + if len(m.Old) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintKv(dAtA, i, uint64(len(m.Old))) + i += copy(dAtA[i:], m.Old) + } + if len(m.New) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintKv(dAtA, i, uint64(len(m.New))) + i += copy(dAtA[i:], m.New) + } + return i, nil +} + +func (m *QueryLimits) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryLimits) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.MaxRecentlyQueriedSeriesBlocks != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintKv(dAtA, i, uint64(m.MaxRecentlyQueriedSeriesBlocks.Size())) + n1, err := m.MaxRecentlyQueriedSeriesBlocks.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.MaxRecentlyQueriedSeriesDiskBytesRead != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintKv(dAtA, i, uint64(m.MaxRecentlyQueriedSeriesDiskBytesRead.Size())) + n2, err := m.MaxRecentlyQueriedSeriesDiskBytesRead.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + if m.MaxRecentlyQueriedSeriesDiskRead != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintKv(dAtA, i, uint64(m.MaxRecentlyQueriedSeriesDiskRead.Size())) + n3, err := m.MaxRecentlyQueriedSeriesDiskRead.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n3 + } + return i, nil +} + +func (m *QueryLimit) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryLimit) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Limit != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintKv(dAtA, i, uint64(m.Limit)) + } + if m.LookbackSeconds != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintKv(dAtA, i, uint64(m.LookbackSeconds)) + } + if m.ForceExceeded { + dAtA[i] = 0x18 + i++ + if m.ForceExceeded { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + return i, nil +} + +func encodeVarintKv(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *KeyValueUpdate) Size() (n int) { + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovKv(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovKv(uint64(l)) + } + if m.Commit { + n += 2 + } + return n +} + +func (m *KeyValueUpdateResult) Size() (n int) { + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovKv(uint64(l)) + } + l = len(m.Old) + if l > 0 { + n += 1 + l + sovKv(uint64(l)) + } + l = len(m.New) + if l > 0 { + n += 1 + l + sovKv(uint64(l)) + } + return n +} + +func (m *QueryLimits) Size() (n int) { + var l int + _ = l + if m.MaxRecentlyQueriedSeriesBlocks != nil { + l = m.MaxRecentlyQueriedSeriesBlocks.Size() + n += 1 + l + sovKv(uint64(l)) + } + if m.MaxRecentlyQueriedSeriesDiskBytesRead != nil { + l = m.MaxRecentlyQueriedSeriesDiskBytesRead.Size() + n += 1 + l + sovKv(uint64(l)) + } + if m.MaxRecentlyQueriedSeriesDiskRead != nil { + l = m.MaxRecentlyQueriedSeriesDiskRead.Size() + n += 1 + l + sovKv(uint64(l)) + } + return n +} + +func (m *QueryLimit) Size() (n int) { + var l int + _ = l + if m.Limit != 0 { + n += 1 + sovKv(uint64(m.Limit)) + } + if m.LookbackSeconds != 0 { + n += 1 + sovKv(uint64(m.LookbackSeconds)) + } + if m.ForceExceeded { + n += 2 + } + return n +} + +func sovKv(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozKv(x uint64) (n int) { + return sovKv(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *KeyValueUpdate) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: KeyValueUpdate: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: KeyValueUpdate: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Commit", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Commit = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipKv(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthKv + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *KeyValueUpdateResult) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: KeyValueUpdateResult: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: KeyValueUpdateResult: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Old", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Old = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field New", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.New = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipKv(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthKv + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryLimits) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryLimits: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryLimits: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxRecentlyQueriedSeriesBlocks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MaxRecentlyQueriedSeriesBlocks == nil { + m.MaxRecentlyQueriedSeriesBlocks = &QueryLimit{} + } + if err := m.MaxRecentlyQueriedSeriesBlocks.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxRecentlyQueriedSeriesDiskBytesRead", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MaxRecentlyQueriedSeriesDiskBytesRead == nil { + m.MaxRecentlyQueriedSeriesDiskBytesRead = &QueryLimit{} + } + if err := m.MaxRecentlyQueriedSeriesDiskBytesRead.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxRecentlyQueriedSeriesDiskRead", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MaxRecentlyQueriedSeriesDiskRead == nil { + m.MaxRecentlyQueriedSeriesDiskRead = &QueryLimit{} + } + if err := m.MaxRecentlyQueriedSeriesDiskRead.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipKv(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthKv + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryLimit) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryLimit: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryLimit: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Limit", wireType) + } + m.Limit = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Limit |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LookbackSeconds", wireType) + } + m.LookbackSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.LookbackSeconds |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ForceExceeded", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.ForceExceeded = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipKv(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthKv + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipKv(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowKv + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowKv + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowKv + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthKv + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowKv + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipKv(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthKv = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowKv = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("github.com/m3db/m3/src/cluster/generated/proto/kvpb/kv.proto", fileDescriptorKv) +} + +var fileDescriptorKv = []byte{ + // 361 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0xcf, 0x6e, 0xda, 0x30, + 0x18, 0x5f, 0xc8, 0x86, 0xc6, 0x87, 0xb6, 0x45, 0x16, 0x9a, 0x38, 0x45, 0x28, 0xda, 0x24, 0x4e, + 0xb1, 0x34, 0xae, 0x3b, 0xa1, 0xed, 0x54, 0x0e, 0xad, 0x51, 0xab, 0x1e, 0x7a, 0x49, 0xec, 0x0f, + 0x1a, 0xc5, 0x89, 0x51, 0xec, 0x50, 0xf2, 0x16, 0x7d, 0x91, 0xbe, 0x47, 0x8f, 0x7d, 0x84, 0x8a, + 0xbe, 0x48, 0x65, 0x83, 0x84, 0xa8, 0xa0, 0xf4, 0x12, 0x7d, 0xbf, 0x5f, 0x7e, 0x7f, 0xe2, 0x7c, + 0x86, 0xbf, 0xf3, 0xcc, 0xdc, 0xd6, 0x69, 0xcc, 0x55, 0x41, 0x8b, 0x91, 0x48, 0x69, 0x31, 0xa2, + 0xba, 0xe2, 0x94, 0xcb, 0x5a, 0x1b, 0xac, 0xe8, 0x1c, 0x4b, 0xac, 0x12, 0x83, 0x82, 0x2e, 0x2a, + 0x65, 0x14, 0xcd, 0x97, 0x8b, 0x94, 0xe6, 0xcb, 0xd8, 0x21, 0xf2, 0xd9, 0xc2, 0xe8, 0x1c, 0xbe, + 0x9f, 0x61, 0x73, 0x95, 0xc8, 0x1a, 0x2f, 0x17, 0x22, 0x31, 0x48, 0x02, 0xf0, 0x73, 0x6c, 0xfa, + 0xde, 0xc0, 0x1b, 0x76, 0x98, 0x1d, 0x49, 0x0f, 0xbe, 0x2c, 0xad, 0xa0, 0xdf, 0x72, 0xdc, 0x06, + 0x90, 0x9f, 0xd0, 0xe6, 0xaa, 0x28, 0x32, 0xd3, 0xf7, 0x07, 0xde, 0xf0, 0x2b, 0xdb, 0xa2, 0x68, + 0x02, 0xbd, 0xfd, 0x44, 0x86, 0xba, 0x96, 0xe6, 0x40, 0x6e, 0x00, 0xbe, 0x92, 0x62, 0x9b, 0x6a, + 0x47, 0xcb, 0x94, 0x78, 0xe7, 0x02, 0x3b, 0xcc, 0x8e, 0xd1, 0x43, 0x0b, 0xba, 0x17, 0x35, 0x56, + 0xcd, 0x24, 0x2b, 0x32, 0xa3, 0xc9, 0x35, 0x84, 0x45, 0xb2, 0x62, 0xc8, 0xb1, 0x34, 0xb2, 0xb1, + 0x6f, 0x32, 0x14, 0x53, 0xfb, 0xd4, 0x63, 0xa9, 0x78, 0xae, 0x5d, 0x41, 0xf7, 0x4f, 0x10, 0xdb, + 0xe3, 0xc5, 0x3b, 0x2b, 0x3b, 0xe1, 0x23, 0x33, 0xf8, 0x7d, 0x4c, 0xf1, 0x2f, 0xd3, 0xf9, 0xb8, + 0x31, 0xa8, 0x19, 0x26, 0x9b, 0xef, 0x3d, 0x54, 0xf0, 0x31, 0x3b, 0xb9, 0x81, 0xc1, 0x7b, 0x42, + 0x57, 0xe1, 0x1f, 0xa9, 0x38, 0xe9, 0x8c, 0x2a, 0x80, 0x9d, 0xde, 0x6e, 0x4e, 0xda, 0xc1, 0xfd, + 0x14, 0x9f, 0x6d, 0x00, 0x19, 0xc2, 0x0f, 0xa9, 0x54, 0x9e, 0x26, 0x3c, 0x9f, 0x22, 0x57, 0xa5, + 0xd0, 0xee, 0x4c, 0x3e, 0x7b, 0x4b, 0x93, 0x5f, 0xf0, 0x6d, 0xa6, 0x2a, 0x8e, 0xff, 0x57, 0x1c, + 0x51, 0xa0, 0xd8, 0xae, 0x7a, 0x9f, 0x1c, 0x07, 0x8f, 0xeb, 0xd0, 0x7b, 0x5a, 0x87, 0xde, 0xf3, + 0x3a, 0xf4, 0xee, 0x5f, 0xc2, 0x4f, 0x69, 0xdb, 0x5d, 0xb1, 0xd1, 0x6b, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x4e, 0xb0, 0xcd, 0x62, 0xa2, 0x02, 0x00, 0x00, +} diff --git a/src/cluster/generated/proto/kvpb/kv.proto b/src/cluster/generated/proto/kvpb/kv.proto new file mode 100644 index 0000000000..ef2b2f60d5 --- /dev/null +++ b/src/cluster/generated/proto/kvpb/kv.proto @@ -0,0 +1,46 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +syntax = "proto3"; + +package kvpb; + +message KeyValueUpdate { + string key = 1; + string value = 2; + bool commit = 3; +} + +message KeyValueUpdateResult { + string key = 1; + string old = 2; + string new = 3; +} + +message QueryLimits { + QueryLimit maxRecentlyQueriedSeriesBlocks = 1; + QueryLimit maxRecentlyQueriedSeriesDiskBytesRead = 2; + QueryLimit maxRecentlyQueriedSeriesDiskRead = 3; +} + +message QueryLimit { + int64 limit = 1; + int64 lookbackSeconds = 2; + bool forceExceeded = 3; +} diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index 678aa3b183..ea8421cec3 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/dbnode/kvconfig/keys.go b/src/dbnode/kvconfig/keys.go index e607be8d38..7d90d48cab 100644 --- a/src/dbnode/kvconfig/keys.go +++ b/src/dbnode/kvconfig/keys.go @@ -49,4 +49,7 @@ const ( // ClientWriteConsistencyLevel is the KV config key for the runtime // configuration specifying the client write consistency level ClientWriteConsistencyLevel = "m3db.client.write-consistency-level" + + // QueryLimits is the KV config key for query limits enforced on each dbnode. + QueryLimits = "m3db.query.limits" ) diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 535ca71df9..93dcce34e8 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -595,7 +595,6 @@ func (r *blockRetriever) streamRequest( shard uint32, id ident.ID, startTime time.Time, - nsCtx namespace.Context, ) error { req.resultWg.Add(1) if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { @@ -672,7 +671,7 @@ func (r *blockRetriever) Stream( } } - err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) + err = r.streamRequest(ctx, req, shard, id, startTime) if err != nil { req.resultWg.Done() return xio.EmptyBlockReader, err @@ -708,7 +707,7 @@ func (r *blockRetriever) StreamWideEntry( req.streamReqType = streamWideEntryReq req.wideFilter = filter - err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) + err = r.streamRequest(ctx, req, shard, id, startTime) if err != nil { req.resultWg.Done() return block.EmptyStreamedWideEntry, err diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 5e7178050a..0fdca0196c 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -820,7 +820,7 @@ func TestLimitSeriesReadFromDisk(t *testing.T) { SetBytesReadLimitOpts(limits.DefaultLookbackLimitOptions()). SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()). SetDiskSeriesReadLimitOpts(limits.LookbackLimitOptions{ - Limit: 1, + Limit: 2, Lookback: time.Second * 1, }) queryLimits, err := limits.NewQueryLimits(limitOpts) @@ -833,8 +833,8 @@ func TestLimitSeriesReadFromDisk(t *testing.T) { require.NoError(t, err) req := &retrieveRequest{} retriever := publicRetriever.(*blockRetriever) - _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) - err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now()) + err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now()) require.Error(t, err) require.Contains(t, err.Error(), "query aborted due to limit") diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 7b50475ec1..1a9e5cd378 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -39,6 +39,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cluster/generated/proto/commonpb" + "github.com/m3db/m3/src/cluster/generated/proto/kvpb" "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3dbnode/config" queryconfig "github.com/m3db/m3/src/cmd/services/m3query/config" @@ -993,6 +994,7 @@ func Run(runOpts RunOptions) { runtimeOptsMgr, cfg.Limits.WriteNewSeriesPerSecond) kvWatchEncodersPerBlockLimit(syncCfg.KVStore, logger, runtimeOptsMgr, cfg.Limits.MaxEncodersPerBlock) + kvWatchQueryLimit(syncCfg.KVStore, logger, queryLimits, limitOpts) }() // Wait for process interrupt. @@ -1165,6 +1167,100 @@ func kvWatchEncodersPerBlockLimit( }() } +func kvWatchQueryLimit( + store kv.Store, + logger *zap.Logger, + limits limits.QueryLimits, + defaultOpts limits.Options, +) { + value, err := store.Get(kvconfig.QueryLimits) + if err == nil { + dynamicLimits := &kvpb.QueryLimits{} + err = value.Unmarshal(dynamicLimits) + if err == nil { + updateQueryLimits(logger, limits, dynamicLimits, defaultOpts) + } + } else if !errors.Is(err, kv.ErrNotFound) { + logger.Warn("error resolving query limit", zap.Error(err)) + } + + watch, err := store.Watch(kvconfig.QueryLimits) + if err != nil { + logger.Error("could not watch query limit", zap.Error(err)) + return + } + + go func() { + dynamicLimits := &kvpb.QueryLimits{} + for range watch.C() { + if newValue := watch.Get(); newValue != nil { + if err := newValue.Unmarshal(dynamicLimits); err != nil { + logger.Warn("unable to parse new query limits", zap.Error(err)) + continue + } + updateQueryLimits(logger, limits, dynamicLimits, defaultOpts) + } + } + }() +} + +func updateQueryLimits(logger *zap.Logger, + queryLimits limits.QueryLimits, + dynamicOpts *kvpb.QueryLimits, + configOpts limits.Options, +) { + var ( + // Default to the config-based limits if unset in dynamic limits. + // Otherwise, use the dynamic limit. + docsLimitOpts = configOpts.DocsLimitOpts() + diskSeriesReadLimitOpts = configOpts.DiskSeriesReadLimitOpts() + bytesReadLimitOpts = configOpts.BytesReadLimitOpts() + ) + if dynamicOpts != nil { + if dynamicOpts.MaxRecentlyQueriedSeriesBlocks != nil { + docsLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedSeriesBlocks) + } + if dynamicOpts.MaxRecentlyQueriedSeriesDiskRead != nil { + diskSeriesReadLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedSeriesDiskRead) + } + if dynamicOpts.MaxRecentlyQueriedSeriesDiskBytesRead != nil { + bytesReadLimitOpts = dynamicLimitToLimitOpts(dynamicOpts.MaxRecentlyQueriedSeriesDiskBytesRead) + } + } + + if err := updateQueryLimit(queryLimits.DocsLimit(), docsLimitOpts); err != nil { + logger.Error("error updating docs limit", zap.Error(err)) + } + + if err := updateQueryLimit(queryLimits.DiskSeriesReadLimit(), diskSeriesReadLimitOpts); err != nil { + logger.Error("error updating series read limit", zap.Error(err)) + } + + if err := updateQueryLimit(queryLimits.BytesReadLimit(), bytesReadLimitOpts); err != nil { + logger.Error("error updating bytes read limit", zap.Error(err)) + } +} + +func updateQueryLimit( + limit limits.LookbackLimit, + newOpts limits.LookbackLimitOptions, +) error { + old := limit.Options() + if old.Equals(newOpts) { + return nil + } + + return limit.Update(newOpts) +} + +func dynamicLimitToLimitOpts(dynamicLimit *kvpb.QueryLimit) limits.LookbackLimitOptions { + return limits.LookbackLimitOptions{ + Limit: dynamicLimit.Limit, + Lookback: time.Duration(dynamicLimit.LookbackSeconds) * time.Second, + ForceExceeded: dynamicLimit.ForceExceeded, + } +} + func kvWatchClientConsistencyLevels( store kv.Store, logger *zap.Logger, diff --git a/src/dbnode/storage/limits/noop_query_limits.go b/src/dbnode/storage/limits/noop_query_limits.go index cf0a9497a6..b1c1787254 100644 --- a/src/dbnode/storage/limits/noop_query_limits.go +++ b/src/dbnode/storage/limits/noop_query_limits.go @@ -58,6 +58,14 @@ func (q *noOpQueryLimits) Stop() { func (q *noOpQueryLimits) Start() { } +func (q *noOpLookbackLimit) Options() LookbackLimitOptions { + return LookbackLimitOptions{} +} + +func (q *noOpLookbackLimit) Update(LookbackLimitOptions) error { + return nil +} + func (q *noOpLookbackLimit) Inc(int, []byte) error { return nil } diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go index 3b4201729c..5de39875a5 100644 --- a/src/dbnode/storage/limits/query_limits.go +++ b/src/dbnode/storage/limits/query_limits.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -22,17 +22,20 @@ package limits import ( "fmt" + "sync" "time" - xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" - "github.com/uber-go/tally" "go.uber.org/atomic" + "go.uber.org/zap" + + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" ) const ( - defaultLookback = time.Second * 15 + disabledLimitValue = 0 + defaultLookback = time.Second * 15 ) type queryLimits struct { @@ -42,18 +45,23 @@ type queryLimits struct { } type lookbackLimit struct { - name string - options LookbackLimitOptions - metrics lookbackLimitMetrics - recent *atomic.Int64 - stopCh chan struct{} + name string + options LookbackLimitOptions + metrics lookbackLimitMetrics + logger *zap.Logger + recent *atomic.Int64 + stopCh chan struct{} + stoppedCh chan struct{} + lock sync.RWMutex } type lookbackLimitMetrics struct { - recentCount tally.Gauge - recentMax tally.Gauge - total tally.Counter - exceeded tally.Counter + optionsLimit tally.Gauge + optionsLookback tally.Gauge + recentCount tally.Gauge + recentMax tally.Gauge + total tally.Counter + exceeded tally.Counter sourceLogger SourceLogger } @@ -67,7 +75,7 @@ var ( func DefaultLookbackLimitOptions() LookbackLimitOptions { return LookbackLimitOptions{ // Default to no limit. - Limit: 0, + Limit: disabledLimitValue, Lookback: defaultLookback, } } @@ -107,11 +115,13 @@ func newLookbackLimit( sourceLoggerBuilder SourceLoggerBuilder, ) *lookbackLimit { return &lookbackLimit{ - name: name, - options: opts, - metrics: newLookbackLimitMetrics(instrumentOpts, name, sourceLoggerBuilder), - recent: atomic.NewInt64(0), - stopCh: make(chan struct{}), + name: name, + options: opts, + metrics: newLookbackLimitMetrics(instrumentOpts, name, sourceLoggerBuilder), + logger: instrumentOpts.Logger(), + recent: atomic.NewInt64(0), + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), } } @@ -128,10 +138,12 @@ func newLookbackLimitMetrics( instrumentOpts.SetMetricsScope(scope)) return lookbackLimitMetrics{ - recentCount: scope.Gauge(fmt.Sprintf("recent-count-%s", name)), - recentMax: scope.Gauge(fmt.Sprintf("recent-max-%s", name)), - total: scope.Counter(fmt.Sprintf("total-%s", name)), - exceeded: scope.Tagged(map[string]string{"limit": name}).Counter("exceeded"), + optionsLimit: scope.Gauge(fmt.Sprintf("current-limit%s", name)), + optionsLookback: scope.Gauge(fmt.Sprintf("current-lookback-%s", name)), + recentCount: scope.Gauge(fmt.Sprintf("recent-count-%s", name)), + recentMax: scope.Gauge(fmt.Sprintf("recent-max-%s", name)), + total: scope.Counter(fmt.Sprintf("total-%s", name)), + exceeded: scope.Tagged(map[string]string{"limit": name}).Counter("exceeded"), sourceLogger: sourceLogger, } @@ -150,15 +162,19 @@ func (q *queryLimits) DiskSeriesReadLimit() LookbackLimit { } func (q *queryLimits) Start() { - q.docsLimit.start() - q.seriesDiskReadLimit.start() - q.bytesReadLimit.start() + // Lock on explicit start to avoid any collision with asynchronous updating + // which will call stop/start if the lookback has changed. + q.docsLimit.startWithLock() + q.seriesDiskReadLimit.startWithLock() + q.bytesReadLimit.startWithLock() } func (q *queryLimits) Stop() { - q.docsLimit.stop() - q.seriesDiskReadLimit.stop() - q.bytesReadLimit.stop() + // Lock on explicit stop to avoid any collision with asynchronous updating + // which will call stop/start if the lookback has changed. + q.docsLimit.stopWithLock() + q.seriesDiskReadLimit.stopWithLock() + q.bytesReadLimit.stopWithLock() } func (q *queryLimits) AnyExceeded() error { @@ -171,6 +187,39 @@ func (q *queryLimits) AnyExceeded() error { return q.bytesReadLimit.exceeded() } +func (q *lookbackLimit) Options() LookbackLimitOptions { + q.lock.RLock() + o := q.options + q.lock.RUnlock() + return o +} + +// Update updates the limit. +func (q *lookbackLimit) Update(opts LookbackLimitOptions) error { + if err := opts.validate(); err != nil { + return err + } + + q.lock.Lock() + defer q.lock.Unlock() + + old := q.options + q.options = opts + + // If the lookback changed, replace the background goroutine that manages the periodic resetting. + if q.options.Lookback != old.Lookback { + q.stop() + q.start() + } + + q.logger.Info("query limit options updated", + zap.String("name", q.name), + zap.Any("new", opts), + zap.Any("old", old)) + + return nil +} + // Inc increments the current value and returns an error if above the limit. func (q *lookbackLimit) Inc(val int, source []byte) error { if val < 0 { @@ -199,7 +248,21 @@ func (q *lookbackLimit) exceeded() error { } func (q *lookbackLimit) checkLimit(recent int64) error { - if q.options.Limit > 0 && recent > q.options.Limit { + q.lock.RLock() + currentOpts := q.options + q.lock.RUnlock() + + if currentOpts.ForceExceeded { + q.metrics.exceeded.Inc(1) + return xerrors.NewInvalidParamsError(NewQueryLimitExceededError(fmt.Sprintf( + "query aborted due to forced limit: name=%s", q.name))) + } + + if currentOpts.Limit == disabledLimitValue { + return nil + } + + if recent >= currentOpts.Limit { q.metrics.exceeded.Inc(1) return xerrors.NewInvalidParamsError(NewQueryLimitExceededError(fmt.Sprintf( "query aborted due to limit: name=%s, limit=%d, current=%d, within=%s", @@ -208,23 +271,45 @@ func (q *lookbackLimit) checkLimit(recent int64) error { return nil } +func (q *lookbackLimit) startWithLock() { + q.lock.Lock() + defer q.lock.Unlock() + q.start() +} + +func (q *lookbackLimit) stopWithLock() { + q.lock.Lock() + defer q.lock.Unlock() + q.stop() +} + func (q *lookbackLimit) start() { ticker := time.NewTicker(q.options.Lookback) go func() { + q.logger.Info("query limit interval started", zap.String("name", q.name)) for { select { case <-ticker.C: q.reset() case <-q.stopCh: ticker.Stop() + q.stoppedCh <- struct{}{} return } } }() + + q.metrics.optionsLimit.Update(float64(q.options.Limit)) + q.metrics.optionsLookback.Update(q.options.Lookback.Seconds()) } func (q *lookbackLimit) stop() { close(q.stopCh) + <-q.stoppedCh + q.stopCh = make(chan struct{}) + q.stoppedCh = make(chan struct{}) + + q.logger.Info("query limit interval stopped", zap.String("name", q.name)) } func (q *lookbackLimit) current() int64 { @@ -243,6 +328,13 @@ func (q *lookbackLimit) reset() { q.recent.Store(0) } +// Equals returns true if the other options match the current. +func (opts LookbackLimitOptions) Equals(other LookbackLimitOptions) bool { + return opts.Limit == other.Limit && + opts.Lookback == other.Lookback && + opts.ForceExceeded == other.ForceExceeded +} + func (opts LookbackLimitOptions) validate() error { if opts.Limit < 0 { return fmt.Errorf("query limit requires limit >= 0 (%d)", opts.Limit) diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go index 23ee91c2ef..6ce32708a9 100644 --- a/src/dbnode/storage/limits/query_limits_test.go +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -48,16 +48,17 @@ func testQueryLimitOptions( } func TestQueryLimits(t *testing.T) { + l := int64(1) docOpts := LookbackLimitOptions{ - Limit: 1, + Limit: l, Lookback: time.Second, } bytesOpts := LookbackLimitOptions{ - Limit: 1, + Limit: l, Lookback: time.Second, } seriesOpts := LookbackLimitOptions{ - Limit: 1, + Limit: l, Lookback: time.Second, } opts := testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) @@ -110,41 +111,50 @@ func TestQueryLimits(t *testing.T) { func TestLookbackLimit(t *testing.T) { for _, test := range []struct { - name string - limit int64 + name string + limit int64 + forceExceeded bool }{ {name: "no limit", limit: 0}, {name: "limit", limit: 5}, + {name: "force exceeded limit", limit: 5, forceExceeded: true}, } { t.Run(test.name, func(t *testing.T) { scope := tally.NewTestScope("", nil) iOpts := instrument.NewOptions().SetMetricsScope(scope) opts := LookbackLimitOptions{ - Limit: test.limit, - Lookback: time.Millisecond * 100, + Limit: test.limit, + Lookback: time.Millisecond * 100, + ForceExceeded: test.forceExceeded, } name := "test" limit := newLookbackLimit(iOpts, opts, name, &sourceLoggerBuilder{}) require.Equal(t, int64(0), limit.current()) + + var exceededCount int64 err := limit.exceeded() - require.NoError(t, err) + if test.limit >= 0 && !test.forceExceeded { + require.NoError(t, err) + } else { + require.Error(t, err) + exceededCount++ + } // Validate ascending while checking limits. - var exceededCount int64 - exceededCount += verifyLimit(t, limit, 3, test.limit) + exceededCount += verifyLimit(t, limit, 3, test.limit, test.forceExceeded) require.Equal(t, int64(3), limit.current()) verifyMetrics(t, scope, name, 3, 0, 3, exceededCount) - exceededCount += verifyLimit(t, limit, 2, test.limit) + exceededCount += verifyLimit(t, limit, 2, test.limit, test.forceExceeded) require.Equal(t, int64(5), limit.current()) verifyMetrics(t, scope, name, 5, 0, 5, exceededCount) - exceededCount += verifyLimit(t, limit, 1, test.limit) + exceededCount += verifyLimit(t, limit, 1, test.limit, test.forceExceeded) require.Equal(t, int64(6), limit.current()) verifyMetrics(t, scope, name, 6, 0, 6, exceededCount) - exceededCount += verifyLimit(t, limit, 4, test.limit) + exceededCount += verifyLimit(t, limit, 4, test.limit, test.forceExceeded) require.Equal(t, int64(10), limit.current()) verifyMetrics(t, scope, name, 10, 0, 10, exceededCount) @@ -154,11 +164,11 @@ func TestLookbackLimit(t *testing.T) { verifyMetrics(t, scope, name, 0, 10, 10, exceededCount) // Validate ascending again post-reset. - exceededCount += verifyLimit(t, limit, 2, test.limit) + exceededCount += verifyLimit(t, limit, 2, test.limit, test.forceExceeded) require.Equal(t, int64(2), limit.current()) verifyMetrics(t, scope, name, 2, 10, 12, exceededCount) - exceededCount += verifyLimit(t, limit, 5, test.limit) + exceededCount += verifyLimit(t, limit, 5, test.limit, test.forceExceeded) require.Equal(t, int64(7), limit.current()) verifyMetrics(t, scope, name, 7, 10, 17, exceededCount) @@ -173,14 +183,37 @@ func TestLookbackLimit(t *testing.T) { require.Equal(t, int64(0), limit.current()) verifyMetrics(t, scope, name, 0, 0, 17, exceededCount) + + limit.reset() + + opts.Limit = 0 + require.NoError(t, limit.Update(opts)) + + exceededCount += verifyLimit(t, limit, 0, opts.Limit, test.forceExceeded) + require.Equal(t, int64(0), limit.current()) + + opts.Limit = 2 + require.NoError(t, limit.Update(opts)) + + exceededCount += verifyLimit(t, limit, 1, opts.Limit, test.forceExceeded) + require.Equal(t, int64(1), limit.current()) + verifyMetrics(t, scope, name, 1, 0, 18, exceededCount) + + exceededCount += verifyLimit(t, limit, 1, opts.Limit, test.forceExceeded) + require.Equal(t, int64(2), limit.current()) + verifyMetrics(t, scope, name, 2, 0, 19, exceededCount) + + exceededCount += verifyLimit(t, limit, 1, opts.Limit, test.forceExceeded) + require.Equal(t, int64(3), limit.current()) + verifyMetrics(t, scope, name, 3, 0, 20, exceededCount) }) } } -func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int64) int64 { +func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int64, forceExceeded bool) int64 { var exceededCount int64 err := limit.Inc(inc, nil) - if limit.current() <= expectedLimit || expectedLimit == 0 { + if (expectedLimit == 0 || limit.current() < expectedLimit) && !forceExceeded { require.NoError(t, err) } else { require.Error(t, err) @@ -188,8 +221,9 @@ func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int6 require.True(t, IsQueryLimitExceededError(err)) exceededCount++ } + err = limit.exceeded() - if limit.current() <= expectedLimit || expectedLimit == 0 { + if (expectedLimit == 0 || limit.current() < expectedLimit) && !forceExceeded { require.NoError(t, err) } else { require.Error(t, err) diff --git a/src/dbnode/storage/limits/types.go b/src/dbnode/storage/limits/types.go index aa87c493bf..22b46b4b57 100644 --- a/src/dbnode/storage/limits/types.go +++ b/src/dbnode/storage/limits/types.go @@ -52,16 +52,23 @@ type QueryLimits interface { // LookbackLimit provides an interface for a specific query limit. type LookbackLimit interface { + // Options returns the current limit options. + Options() LookbackLimitOptions // Inc increments the recent value for the limit. Inc(new int, source []byte) error + // Update changes the lookback limit settings. + Update(opts LookbackLimitOptions) error } // LookbackLimitOptions holds options for a lookback limit to be enforced. type LookbackLimitOptions struct { // Limit past which errors will be returned. + // Zero disables the limit. Limit int64 // Lookback is the period over which the limit is enforced. Lookback time.Duration + // ForceExceeded, if true, makes all calls to the limit behave as though the limit is exceeded. + ForceExceeded bool } // SourceLoggerBuilder builds a SourceLogger given instrument options. diff --git a/src/query/api/v1/handler/database/common.go b/src/query/api/v1/handler/database/common.go index 70854ea06b..5572141176 100644 --- a/src/query/api/v1/handler/database/common.go +++ b/src/query/api/v1/handler/database/common.go @@ -18,6 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +// Package database contains API endpoints for managing the database. package database import ( @@ -54,6 +55,8 @@ func RegisterRoutes( return err } + kvStoreHandler := NewKeyValueStoreHandler(client, instrumentOpts) + // Register the same handler under two different endpoints. This just makes explaining things in // our documentation easier so we can separate out concepts, but share the underlying code. if err := r.Register(queryhttp.RegisterOptions{ @@ -70,6 +73,13 @@ func RegisterRoutes( }); err != nil { return err } + if err := r.Register(queryhttp.RegisterOptions{ + Path: KeyValueStoreURL, + Handler: kvStoreHandler, + Methods: []string{KeyValueStoreHTTPMethod}, + }); err != nil { + return err + } return nil } diff --git a/src/query/api/v1/handler/database/kvstore.go b/src/query/api/v1/handler/database/kvstore.go new file mode 100644 index 0000000000..8b3c5a9b0e --- /dev/null +++ b/src/query/api/v1/handler/database/kvstore.go @@ -0,0 +1,204 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package database + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + + "github.com/gogo/protobuf/jsonpb" + "go.uber.org/zap" + "google.golang.org/protobuf/runtime/protoiface" + + clusterclient "github.com/m3db/m3/src/cluster/client" + "github.com/m3db/m3/src/cluster/generated/proto/commonpb" + "github.com/m3db/m3/src/cluster/generated/proto/kvpb" + "github.com/m3db/m3/src/cluster/kv" + nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" + "github.com/m3db/m3/src/dbnode/kvconfig" + "github.com/m3db/m3/src/query/api/v1/handler" + "github.com/m3db/m3/src/query/util/logging" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + xhttp "github.com/m3db/m3/src/x/net/http" +) + +const ( + // KeyValueStoreURL is the url to edit key/value configuration values. + KeyValueStoreURL = handler.RoutePrefixV1 + "/kvstore" + // KeyValueStoreHTTPMethod is the HTTP method used with this resource. + KeyValueStoreHTTPMethod = http.MethodPost +) + +// KeyValueUpdate defines an update to a key's value. +type KeyValueUpdate struct { + // Key to update. + Key string `json:"key"` + // Value to update the key to. + Value json.RawMessage `json:"value"` + // Commit, if false, will not persist the update. If true, the + // update will be persisted. Used to test format of inputs. + Commit bool `json:"commit"` +} + +// KeyValueUpdateResult defines the result of an update to a key's value. +type KeyValueUpdateResult struct { + // Key to update. + Key string `json:"key"` + // Old is the value before the update. + Old string `json:"old"` + // New is the value after the update. + New string `json:"new"` + // Version of the key. + Version int `json:"version"` +} + +// KeyValueStoreHandler represents a handler for the key/value store endpoint +type KeyValueStoreHandler struct { + client clusterclient.Client + instrumentOpts instrument.Options +} + +// NewKeyValueStoreHandler returns a new instance of handler +func NewKeyValueStoreHandler( + client clusterclient.Client, + instrumentOpts instrument.Options, +) http.Handler { + return &KeyValueStoreHandler{ + client: client, + instrumentOpts: instrumentOpts, + } +} + +func (h *KeyValueStoreHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + logger := logging.WithContext(r.Context(), h.instrumentOpts) + + update, err := h.parseBody(r) + if err != nil { + logger.Error("unable to parse request", zap.Error(err)) + xhttp.WriteError(w, err) + return + } + + kvStore, err := h.client.KV() + if err != nil { + logger.Error("unable to get kv store", zap.Error(err)) + xhttp.WriteError(w, err) + return + } + + results, err := h.update(logger, kvStore, update) + if err != nil { + logger.Error("kv store error", + zap.Error(err), + zap.Any("update", update)) + xhttp.WriteError(w, err) + return + } + + xhttp.WriteJSONResponse(w, results, logger) +} + +func (h *KeyValueStoreHandler) parseBody(r *http.Request) (*KeyValueUpdate, error) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + defer r.Body.Close() + + var parsed KeyValueUpdate + if err := json.Unmarshal(body, &parsed); err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + + return &parsed, nil +} + +func (h *KeyValueStoreHandler) update( + logger *zap.Logger, + kvStore kv.Store, + update *KeyValueUpdate, +) (*KeyValueUpdateResult, error) { + old, err := kvStore.Get(update.Key) + if err != nil && !errors.Is(err, kv.ErrNotFound) { + return nil, err + } + + oldProto, err := newKVProtoMessage(update.Key) + if err != nil { + return nil, err + } + + if old != nil { + if err := old.Unmarshal(oldProto); err != nil { + // Only log so we can overwrite corrupt existing entries. + logger.Error("cannot unmarshal old kv proto", zap.Error(err), zap.String("key", update.Key)) + } + } + + newProto, err := newKVProtoMessage(update.Key) + if err != nil { + return nil, err + } + + if err := jsonpb.UnmarshalString(string([]byte(update.Value)), newProto); err != nil { + return nil, err + } + + var version int + if update.Commit { + version, err = kvStore.Set(update.Key, newProto) + if err != nil { + return nil, err + } + } + + result := KeyValueUpdateResult{ + Key: update.Key, + Old: oldProto.String(), + New: newProto.String(), + Version: version, + } + + logger.Info("kv store", zap.Any("update", *update), zap.Any("result", result)) + + return &result, nil +} + +func newKVProtoMessage(key string) (protoiface.MessageV1, error) { + switch key { + case kvconfig.NamespacesKey: + return &nsproto.Registry{}, nil + case kvconfig.ClusterNewSeriesInsertLimitKey: + case kvconfig.EncodersPerBlockLimitKey: + return &commonpb.Int64Proto{}, nil + case kvconfig.ClientBootstrapConsistencyLevel: + case kvconfig.ClientReadConsistencyLevel: + case kvconfig.ClientWriteConsistencyLevel: + return &commonpb.StringProto{}, nil + case kvconfig.QueryLimits: + return &kvpb.QueryLimits{}, nil + } + return nil, fmt.Errorf("unsupported kvstore key %s", key) +} diff --git a/src/query/api/v1/handler/database/kvstore_test.go b/src/query/api/v1/handler/database/kvstore_test.go new file mode 100644 index 0000000000..4a6f047bab --- /dev/null +++ b/src/query/api/v1/handler/database/kvstore_test.go @@ -0,0 +1,186 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package database + +import ( + "encoding/json" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/m3db/m3/src/cluster/generated/proto/kvpb" + "github.com/m3db/m3/src/cluster/kv" + "github.com/m3db/m3/src/dbnode/kvconfig" +) + +func TestUpdateQueryLimits(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + tests := []struct { + name string + limits *kvpb.QueryLimits + commit bool + expectedJSON string + expectedError string + }{ + { + name: `nil`, + limits: nil, + commit: true, + expectedJSON: "", + }, + { + name: `empty`, + limits: &kvpb.QueryLimits{}, + commit: true, + expectedJSON: "", + }, + { + name: `only block - commit`, + limits: &kvpb.QueryLimits{ + MaxRecentlyQueriedSeriesBlocks: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + }, + commit: true, + expectedJSON: `maxRecentlyQueriedSeriesBlocks: `, + }, + { + name: `only block - no commit`, + limits: &kvpb.QueryLimits{ + MaxRecentlyQueriedSeriesBlocks: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + }, + commit: false, + expectedJSON: `maxRecentlyQueriedSeriesBlocks: `, + }, + { + name: `all - commit`, + limits: &kvpb.QueryLimits{ + MaxRecentlyQueriedSeriesBlocks: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + MaxRecentlyQueriedSeriesDiskBytesRead: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + MaxRecentlyQueriedSeriesDiskRead: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + }, + commit: true, + // nolint: lll + expectedJSON: `maxRecentlyQueriedSeriesBlocks: maxRecentlyQueriedSeriesDiskBytesRead: maxRecentlyQueriedSeriesDiskRead: `, + }, + { + name: `all - no commit`, + limits: &kvpb.QueryLimits{ + MaxRecentlyQueriedSeriesBlocks: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + MaxRecentlyQueriedSeriesDiskBytesRead: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + MaxRecentlyQueriedSeriesDiskRead: &kvpb.QueryLimit{ + Limit: 1, + LookbackSeconds: 15, + ForceExceeded: true, + }, + }, + commit: false, + // nolint: lll + expectedJSON: `maxRecentlyQueriedSeriesBlocks: maxRecentlyQueriedSeriesDiskBytesRead: maxRecentlyQueriedSeriesDiskRead: `, + }, + } + + for _, test := range tests { + limitJSON, err := json.Marshal(test.limits) + require.NoError(t, err) + + update := &KeyValueUpdate{ + Key: kvconfig.QueryLimits, + Value: json.RawMessage(limitJSON), + Commit: test.commit, + } + + storeMock := kv.NewMockStore(ctrl) + + // (A) test no old value. + storeMock.EXPECT().Get(kvconfig.QueryLimits).Return(nil, kv.ErrNotFound) + if test.commit { + storeMock.EXPECT().Set(kvconfig.QueryLimits, gomock.Any()).Return(0, nil) + } + + handler := &KeyValueStoreHandler{} + r, err := handler.update(zap.NewNop(), storeMock, update) + require.NoError(t, err) + require.Equal(t, kvconfig.QueryLimits, r.Key) + require.Equal(t, "", r.Old) + require.Equal(t, test.expectedJSON, r.New) + require.Equal(t, 0, r.Version) + + // (B) test old value. + mockVal := kv.NewMockValue(ctrl) + storeMock.EXPECT().Get(kvconfig.QueryLimits).Return(mockVal, nil) + mockVal.EXPECT().Unmarshal(gomock.Any()).DoAndReturn(func(v *kvpb.QueryLimits) error { + v.MaxRecentlyQueriedSeriesBlocks = &kvpb.QueryLimit{ + Limit: 10, + LookbackSeconds: 30, + ForceExceeded: false, + } + v.MaxRecentlyQueriedSeriesDiskBytesRead = &kvpb.QueryLimit{ + Limit: 100, + LookbackSeconds: 300, + ForceExceeded: false, + } + return nil + }) + if test.commit { + storeMock.EXPECT().Set(kvconfig.QueryLimits, gomock.Any()).Return(0, nil) + } + + handler = &KeyValueStoreHandler{} + r, err = handler.update(zap.NewNop(), storeMock, update) + require.NoError(t, err) + require.Equal(t, kvconfig.QueryLimits, r.Key) + // nolint: lll + require.Equal(t, `maxRecentlyQueriedSeriesBlocks: maxRecentlyQueriedSeriesDiskBytesRead: `, r.Old) + require.Equal(t, test.expectedJSON, r.New) + require.Equal(t, 0, r.Version) + } +}