From 4850f892837bdcc3dd28b9f31b79a1ea2eb7eeb7 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 5 Sep 2024 11:39:03 +0800 Subject: [PATCH] enhance: Add delete buffer related quota logic (#35918) See also #35303 --------- Signed-off-by: Congqi Xia --- configs/milvus.yaml | 8 ++ internal/querynodev2/delegator/delegator.go | 5 + .../delegator/deletebuffer/delete_buffer.go | 58 ++++++++-- .../deletebuffer/delete_buffer_test.go | 6 ++ .../delegator/deletebuffer/delete_item.go | 10 ++ .../deletebuffer/delete_item_test.go | 3 + .../deletebuffer/list_delete_buffer.go | 12 +++ .../deletebuffer/list_delete_buffer_test.go | 17 +++ .../querynodev2/delegator/mock_delegator.go | 51 +++++++++ internal/querynodev2/metrics_info.go | 16 +++ internal/querynodev2/services_test.go | 25 ++++- internal/rootcoord/quota_center.go | 59 +++++++++++ pkg/util/metricsinfo/quota_metric.go | 6 ++ pkg/util/paramtable/quota_param.go | 100 ++++++++++++++---- 14 files changed, 344 insertions(+), 32 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 60c3010fbf2a0..4207c90fdf318 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -973,6 +973,14 @@ quotaAndLimits: enabled: false # switch to enable l0 segment row count quota lowWaterLevel: 32768 # l0 segment row count quota, low water level highWaterLevel: 65536 # l0 segment row count quota, low water level + deleteBufferRowCountProtection: + enabled: false # switch to enable delete buffer row count quota + lowWaterLevel: 32768 # delete buffer row count quota, low water level + highWaterLevel: 65536 # delete buffer row count quota, high water level + deleteBufferSizeProtection: + enabled: false # switch to enable delete buffer size quota + lowWaterLevel: 134217728 # delete buffer size quota, low water level + highWaterLevel: 268435456 # delete buffer size quota, high water level limitReading: # forceDeny false means dql requests are allowed (except for some # specific conditions, such as collection has been dropped), true means always reject all dql requests. diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 1d11b4c5add68..2231253978e01 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -80,6 +80,7 @@ type ShardDelegator interface { ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) GetTargetVersion() int64 + GetDeleteBufferSize() (entryNum int64, memorySize int64) // manage exclude segments AddExcludedSegments(excludeInfo map[int64]uint64) @@ -585,6 +586,10 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta return results, nil } +func (sd *shardDelegator) GetDeleteBufferSize() (entryNum int64, memorySize int64) { + return sd.deleteBuffer.Size() +} + type subTask[T any] struct { req T targetID int64 diff --git a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go index cecc03ea19a82..91d1e8e687ef3 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_buffer.go @@ -28,6 +28,7 @@ var errBufferFull = errors.New("buffer full") type timed interface { Timestamp() uint64 Size() int64 + EntryNum() int64 } // DeleteBuffer is the interface for delete buffer. @@ -36,6 +37,8 @@ type DeleteBuffer[T timed] interface { ListAfter(uint64) []T SafeTs() uint64 TryDiscard(uint64) + // Size returns current size information of delete buffer: entryNum and memory + Size() (entryNum, memorySize int64) } func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] { @@ -86,31 +89,59 @@ func (c *doubleCacheBuffer[T]) ListAfter(ts uint64) []T { return result } +func (c *doubleCacheBuffer[T]) Size() (entryNum int64, memorySize int64) { + c.mut.RLock() + defer c.mut.RUnlock() + + if c.head != nil { + blockNum, blockSize := c.head.Size() + entryNum += blockNum + memorySize += blockSize + } + + if c.tail != nil { + blockNum, blockSize := c.tail.Size() + entryNum += blockNum + memorySize += blockSize + } + + return entryNum, memorySize +} + // evict sets head as tail and evicts tail. func (c *doubleCacheBuffer[T]) evict(newTs uint64, entry T) { c.tail = c.head c.head = &cacheBlock[T]{ - headTs: newTs, - maxSize: c.maxSize / 2, - size: entry.Size(), - data: []T{entry}, + headTs: newTs, + maxSize: c.maxSize / 2, + size: entry.Size(), + entryNum: entry.EntryNum(), + data: []T{entry}, } c.ts = c.tail.headTs } func newCacheBlock[T timed](ts uint64, maxSize int64, elements ...T) *cacheBlock[T] { + var entryNum, memorySize int64 + for _, element := range elements { + entryNum += element.EntryNum() + memorySize += element.Size() + } return &cacheBlock[T]{ - headTs: ts, - maxSize: maxSize, - data: elements, + headTs: ts, + maxSize: maxSize, + data: elements, + entryNum: entryNum, + size: memorySize, } } type cacheBlock[T timed] struct { - mut sync.RWMutex - headTs uint64 - size int64 - maxSize int64 + mut sync.RWMutex + headTs uint64 + entryNum int64 + size int64 + maxSize int64 data []T } @@ -127,6 +158,7 @@ func (c *cacheBlock[T]) Put(entry T) error { c.data = append(c.data, entry) c.size += entry.Size() + c.entryNum += entry.EntryNum() return nil } @@ -143,3 +175,7 @@ func (c *cacheBlock[T]) ListAfter(ts uint64) []T { } return c.data[idx:] } + +func (c *cacheBlock[T]) Size() (entryNum, memorySize int64) { + return c.entryNum, c.size +} diff --git a/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go index 2c8ed0f5a02d3..cef7a4a8461d2 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_buffer_test.go @@ -110,6 +110,9 @@ func (s *DoubleCacheBufferSuite) TestPut() { s.Equal(2, len(buffer.ListAfter(11))) s.Equal(1, len(buffer.ListAfter(12))) + entryNum, memorySize := buffer.Size() + s.EqualValues(2, entryNum) + s.EqualValues(304, memorySize) buffer.Put(&Item{ Ts: 13, @@ -128,6 +131,9 @@ func (s *DoubleCacheBufferSuite) TestPut() { s.Equal(2, len(buffer.ListAfter(11))) s.Equal(2, len(buffer.ListAfter(12))) s.Equal(1, len(buffer.ListAfter(13))) + entryNum, memorySize = buffer.Size() + s.EqualValues(2, entryNum) + s.EqualValues(304, memorySize) } func TestDoubleCacheDeleteBuffer(t *testing.T) { diff --git a/internal/querynodev2/delegator/deletebuffer/delete_item.go b/internal/querynodev2/delegator/deletebuffer/delete_item.go index abc89baa0c0f9..6381b200f8d2a 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_item.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_item.go @@ -24,6 +24,12 @@ func (item *Item) Size() int64 { }, int64(0)) } +func (item *Item) EntryNum() int64 { + return lo.Reduce(item.Data, func(entryNum int64, item BufferItem, _ int) int64 { + return entryNum + item.EntryNum() + }, int64(0)) +} + type BufferItem struct { PartitionID int64 DeleteData storage.DeleteData @@ -37,3 +43,7 @@ func (item *BufferItem) Size() int64 { return int64(96) + pkSize + int64(8*len(item.DeleteData.Tss)) } + +func (item *BufferItem) EntryNum() int64 { + return int64(len(item.DeleteData.Pks)) +} diff --git a/internal/querynodev2/delegator/deletebuffer/delete_item_test.go b/internal/querynodev2/delegator/deletebuffer/delete_item_test.go index 59bf9d979337f..e58abda62fde4 100644 --- a/internal/querynodev2/delegator/deletebuffer/delete_item_test.go +++ b/internal/querynodev2/delegator/deletebuffer/delete_item_test.go @@ -15,9 +15,12 @@ func TestDeleteBufferItem(t *testing.T) { } assert.Equal(t, int64(96), item.Size()) + assert.EqualValues(t, 0, item.EntryNum()) item.DeleteData.Pks = []storage.PrimaryKey{ storage.NewInt64PrimaryKey(10), } item.DeleteData.Tss = []uint64{2000} + assert.Equal(t, int64(120), item.Size()) + assert.EqualValues(t, 1, item.EntryNum()) } diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go index 400a35cfd692c..8991f35bc5391 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go @@ -92,3 +92,15 @@ func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) { b.list = b.list[nextHead:] } } + +func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) { + b.mut.RLock() + defer b.mut.RUnlock() + + for _, block := range b.list { + blockNum, blockSize := block.Size() + entryNum += blockNum + memorySize += blockSize + } + return entryNum, memorySize +} diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go index fc67b170e4e66..84f6d67bc47c4 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go @@ -62,6 +62,9 @@ func (s *ListDeleteBufferSuite) TestCache() { s.Equal(2, len(buffer.ListAfter(11))) s.Equal(1, len(buffer.ListAfter(12))) + entryNum, memorySize := buffer.Size() + s.EqualValues(0, entryNum) + s.EqualValues(192, memorySize) } func (s *ListDeleteBufferSuite) TestTryDiscard() { @@ -95,18 +98,32 @@ func (s *ListDeleteBufferSuite) TestTryDiscard() { }) s.Equal(2, len(buffer.ListAfter(10))) + entryNum, memorySize := buffer.Size() + s.EqualValues(2, entryNum) + s.EqualValues(240, memorySize) buffer.TryDiscard(10) s.Equal(2, len(buffer.ListAfter(10)), "equal ts shall not discard block") + entryNum, memorySize = buffer.Size() + s.EqualValues(2, entryNum) + s.EqualValues(240, memorySize) buffer.TryDiscard(9) s.Equal(2, len(buffer.ListAfter(10)), "history ts shall not discard any block") + entryNum, memorySize = buffer.Size() + s.EqualValues(2, entryNum) + s.EqualValues(240, memorySize) buffer.TryDiscard(20) s.Equal(1, len(buffer.ListAfter(10)), "first block shall be discarded") + entryNum, memorySize = buffer.Size() + s.EqualValues(1, entryNum) + s.EqualValues(120, memorySize) buffer.TryDiscard(20) s.Equal(1, len(buffer.ListAfter(10)), "discard will not happen if there is only one block") + s.EqualValues(1, entryNum) + s.EqualValues(120, memorySize) } func TestListDeleteBuffer(t *testing.T) { diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index dcfa997ad01f1..4cc49950a95b3 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -134,6 +134,57 @@ func (_c *MockShardDelegator_Collection_Call) RunAndReturn(run func() int64) *Mo return _c } +// GetDeleteBufferSize provides a mock function with given fields: +func (_m *MockShardDelegator) GetDeleteBufferSize() (int64, int64) { + ret := _m.Called() + + var r0 int64 + var r1 int64 + if rf, ok := ret.Get(0).(func() (int64, int64)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func() int64); ok { + r1 = rf() + } else { + r1 = ret.Get(1).(int64) + } + + return r0, r1 +} + +// MockShardDelegator_GetDeleteBufferSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDeleteBufferSize' +type MockShardDelegator_GetDeleteBufferSize_Call struct { + *mock.Call +} + +// GetDeleteBufferSize is a helper method to define mock.On call +func (_e *MockShardDelegator_Expecter) GetDeleteBufferSize() *MockShardDelegator_GetDeleteBufferSize_Call { + return &MockShardDelegator_GetDeleteBufferSize_Call{Call: _e.mock.On("GetDeleteBufferSize")} +} + +func (_c *MockShardDelegator_GetDeleteBufferSize_Call) Run(run func()) *MockShardDelegator_GetDeleteBufferSize_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockShardDelegator_GetDeleteBufferSize_Call) Return(entryNum int64, memorySize int64) *MockShardDelegator_GetDeleteBufferSize_Call { + _c.Call.Return(entryNum, memorySize) + return _c +} + +func (_c *MockShardDelegator_GetDeleteBufferSize_Call) RunAndReturn(run func() (int64, int64)) *MockShardDelegator_GetDeleteBufferSize_Call { + _c.Call.Return(run) + return _c +} + // GetPartitionStatsVersions provides a mock function with given fields: ctx func (_m *MockShardDelegator) GetPartitionStatsVersions(ctx context.Context) map[int64]int64 { ret := _m.Called(ctx) diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index bbd8b61913182..f60a07e036f9c 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/querynodev2/collector" + "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -125,6 +126,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error ).Set(float64(numEntities)) } + deleteBufferNum := make(map[int64]int64) + deleteBufferSize := make(map[int64]int64) + + node.delegators.Range(func(_ string, sd delegator.ShardDelegator) bool { + collectionID := sd.Collection() + entryNum, memorySize := sd.GetDeleteBufferSize() + deleteBufferNum[collectionID] += entryNum + deleteBufferSize[collectionID] += memorySize + return true + }) + return &metricsinfo.QueryNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, Rms: rms, @@ -138,6 +150,10 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error NodeID: node.GetNodeID(), CollectionIDs: collections, }, + DeleteBufferInfo: metricsinfo.DeleteBufferInfo{ + CollectionDeleteBufferNum: deleteBufferNum, + CollectionDeleteBufferSize: deleteBufferSize, + }, }, nil } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index b232bf555b7d6..8a8225ea0775c 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1749,9 +1749,32 @@ func (suite *ServiceSuite) TestGetMetric_Normal() { Request: string(mReq), } + sd1 := delegator.NewMockShardDelegator(suite.T()) + sd1.EXPECT().Collection().Return(100) + sd1.EXPECT().GetDeleteBufferSize().Return(10, 1000) + sd1.EXPECT().Close().Maybe() + suite.node.delegators.Insert("qn_unitest_dml_0_100v0", sd1) + + sd2 := delegator.NewMockShardDelegator(suite.T()) + sd2.EXPECT().Collection().Return(100) + sd2.EXPECT().GetDeleteBufferSize().Return(10, 1000) + sd2.EXPECT().Close().Maybe() + suite.node.delegators.Insert("qn_unitest_dml_1_100v1", sd2) + resp, err := suite.node.GetMetrics(ctx, req) + err = merr.CheckRPCCall(resp, err) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + + info := &metricsinfo.QueryNodeInfos{} + err = metricsinfo.UnmarshalComponentInfos(resp.GetResponse(), info) + suite.NoError(err) + + entryNum, ok := info.QuotaMetrics.DeleteBufferInfo.CollectionDeleteBufferNum[100] + suite.True(ok) + suite.EqualValues(20, entryNum) + memorySize, ok := info.QuotaMetrics.DeleteBufferInfo.CollectionDeleteBufferSize[100] + suite.True(ok) + suite.EqualValues(2000, memorySize) } func (suite *ServiceSuite) TestGetMetric_Failed() { diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 4d90be4fe342d..bf30e1cf06471 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -757,6 +757,10 @@ func (q *QuotaCenter) calculateWriteRates() error { updateCollectionFactor(growingSegFactors) l0Factors := q.getL0SegmentsSizeFactor() updateCollectionFactor(l0Factors) + deleteBufferRowCountFactors := q.getDeleteBufferRowCountFactor() + updateCollectionFactor(deleteBufferRowCountFactors) + deleteBufferSizeFactors := q.getDeleteBufferSizeFactor() + updateCollectionFactor(deleteBufferSizeFactors) ttCollections := make([]int64, 0) memoryCollections := make([]int64, 0) @@ -1034,6 +1038,61 @@ func (q *QuotaCenter) getL0SegmentsSizeFactor() map[int64]float64 { return collectionFactor } +func (q *QuotaCenter) getDeleteBufferRowCountFactor() map[int64]float64 { + if !Params.QuotaConfig.DeleteBufferRowCountProtectionEnabled.GetAsBool() { + return nil + } + + deleteBufferRowCountLowWaterLevel := Params.QuotaConfig.DeleteBufferRowCountLowWaterLevel.GetAsInt64() + deleteBufferRowCountHighWaterLevel := Params.QuotaConfig.DeleteBufferRowCountHighWaterLevel.GetAsInt64() + + deleteBufferNum := make(map[int64]int64) + for _, queryNodeMetrics := range q.queryNodeMetrics { + for collectionID, num := range queryNodeMetrics.DeleteBufferInfo.CollectionDeleteBufferNum { + deleteBufferNum[collectionID] += num + } + for collectionID, size := range queryNodeMetrics.DeleteBufferInfo.CollectionDeleteBufferSize { + deleteBufferNum[collectionID] += size + } + } + + collectionFactor := make(map[int64]float64) + for collID, rowCount := range map[int64]int64{100: 1000} { + if rowCount < deleteBufferRowCountLowWaterLevel { + continue + } + factor := float64(deleteBufferRowCountHighWaterLevel-rowCount) / float64(deleteBufferRowCountHighWaterLevel-deleteBufferRowCountLowWaterLevel) + collectionFactor[collID] = factor + } + return collectionFactor +} + +func (q *QuotaCenter) getDeleteBufferSizeFactor() map[int64]float64 { + if !Params.QuotaConfig.DeleteBufferSizeProtectionEnabled.GetAsBool() { + return nil + } + + deleteBufferRowCountLowWaterLevel := Params.QuotaConfig.DeleteBufferSizeLowWaterLevel.GetAsInt64() + deleteBufferRowCountHighWaterLevel := Params.QuotaConfig.DeleteBufferSizeHighWaterLevel.GetAsInt64() + + deleteBufferSize := make(map[int64]int64) + for _, queryNodeMetrics := range q.queryNodeMetrics { + for collectionID, size := range queryNodeMetrics.DeleteBufferInfo.CollectionDeleteBufferSize { + deleteBufferSize[collectionID] += size + } + } + + collectionFactor := make(map[int64]float64) + for collID, rowCount := range map[int64]int64{100: 1000} { + if rowCount < deleteBufferRowCountLowWaterLevel { + continue + } + factor := float64(deleteBufferRowCountHighWaterLevel-rowCount) / float64(deleteBufferRowCountHighWaterLevel-deleteBufferRowCountLowWaterLevel) + collectionFactor[collID] = factor + } + return collectionFactor +} + // calculateRates calculates target rates by different strategies. func (q *QuotaCenter) calculateRates() error { err := q.resetAllCurrentRates() diff --git a/pkg/util/metricsinfo/quota_metric.go b/pkg/util/metricsinfo/quota_metric.go index 108a5e50a00a1..1c29f6eb363b5 100644 --- a/pkg/util/metricsinfo/quota_metric.go +++ b/pkg/util/metricsinfo/quota_metric.go @@ -62,6 +62,12 @@ type QueryNodeQuotaMetrics struct { Fgm FlowGraphMetric GrowingSegmentsSize int64 Effect NodeEffect + DeleteBufferInfo DeleteBufferInfo +} + +type DeleteBufferInfo struct { + CollectionDeleteBufferNum map[int64]int64 + CollectionDeleteBufferSize map[int64]int64 } type DataCoordQuotaMetrics struct { diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 73516fbb13ca3..94ea9c807c520 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -134,26 +134,32 @@ type quotaConfig struct { MaxResourceGroupNumOfQueryNode ParamItem `refreshable:"true"` // limit writing - ForceDenyWriting ParamItem `refreshable:"true"` - TtProtectionEnabled ParamItem `refreshable:"true"` - MaxTimeTickDelay ParamItem `refreshable:"true"` - MemProtectionEnabled ParamItem `refreshable:"true"` - DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` - DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` - QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` - QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` - GrowingSegmentsSizeProtectionEnabled ParamItem `refreshable:"true"` - GrowingSegmentsSizeMinRateRatio ParamItem `refreshable:"true"` - GrowingSegmentsSizeLowWaterLevel ParamItem `refreshable:"true"` - GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"` - DiskProtectionEnabled ParamItem `refreshable:"true"` - DiskQuota ParamItem `refreshable:"true"` - DiskQuotaPerDB ParamItem `refreshable:"true"` - DiskQuotaPerCollection ParamItem `refreshable:"true"` - DiskQuotaPerPartition ParamItem `refreshable:"true"` - L0SegmentRowCountProtectionEnabled ParamItem `refreshable:"true"` - L0SegmentRowCountLowWaterLevel ParamItem `refreshable:"true"` - L0SegmentRowCountHighWaterLevel ParamItem `refreshable:"true"` + ForceDenyWriting ParamItem `refreshable:"true"` + TtProtectionEnabled ParamItem `refreshable:"true"` + MaxTimeTickDelay ParamItem `refreshable:"true"` + MemProtectionEnabled ParamItem `refreshable:"true"` + DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` + DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` + QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` + QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` + GrowingSegmentsSizeProtectionEnabled ParamItem `refreshable:"true"` + GrowingSegmentsSizeMinRateRatio ParamItem `refreshable:"true"` + GrowingSegmentsSizeLowWaterLevel ParamItem `refreshable:"true"` + GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"` + DiskProtectionEnabled ParamItem `refreshable:"true"` + DiskQuota ParamItem `refreshable:"true"` + DiskQuotaPerDB ParamItem `refreshable:"true"` + DiskQuotaPerCollection ParamItem `refreshable:"true"` + DiskQuotaPerPartition ParamItem `refreshable:"true"` + L0SegmentRowCountProtectionEnabled ParamItem `refreshable:"true"` + L0SegmentRowCountLowWaterLevel ParamItem `refreshable:"true"` + L0SegmentRowCountHighWaterLevel ParamItem `refreshable:"true"` + DeleteBufferRowCountProtectionEnabled ParamItem `refreshable:"true"` + DeleteBufferRowCountLowWaterLevel ParamItem `refreshable:"true"` + DeleteBufferRowCountHighWaterLevel ParamItem `refreshable:"true"` + DeleteBufferSizeProtectionEnabled ParamItem `refreshable:"true"` + DeleteBufferSizeLowWaterLevel ParamItem `refreshable:"true"` + DeleteBufferSizeHighWaterLevel ParamItem `refreshable:"true"` // limit reading ForceDenyReading ParamItem `refreshable:"true"` @@ -1906,6 +1912,60 @@ but the rate will not be lower than minRateRatio * dmlRate.`, } p.L0SegmentRowCountHighWaterLevel.Init(base.mgr) + p.DeleteBufferRowCountProtectionEnabled = ParamItem{ + Key: "quotaAndLimits.limitWriting.deleteBufferRowCountProtection.enabled", + Version: "2.4.11", + DefaultValue: "false", + Doc: "switch to enable delete buffer row count quota", + Export: true, + } + p.DeleteBufferRowCountProtectionEnabled.Init(base.mgr) + + p.DeleteBufferRowCountLowWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.deleteBufferRowCountProtection.lowWaterLevel", + Version: "2.4.11", + DefaultValue: "32768", + Doc: "delete buffer row count quota, low water level", + Export: true, + } + p.DeleteBufferRowCountLowWaterLevel.Init(base.mgr) + + p.DeleteBufferRowCountHighWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.deleteBufferRowCountProtection.highWaterLevel", + Version: "2.4.11", + DefaultValue: "65536", + Doc: "delete buffer row count quota, high water level", + Export: true, + } + p.DeleteBufferRowCountHighWaterLevel.Init(base.mgr) + + p.DeleteBufferSizeProtectionEnabled = ParamItem{ + Key: "quotaAndLimits.limitWriting.deleteBufferSizeProtection.enabled", + Version: "2.4.11", + DefaultValue: "false", + Doc: "switch to enable delete buffer size quota", + Export: true, + } + p.DeleteBufferSizeProtectionEnabled.Init(base.mgr) + + p.DeleteBufferSizeLowWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.deleteBufferSizeProtection.lowWaterLevel", + Version: "2.4.11", + DefaultValue: "134217728", // 128MB + Doc: "delete buffer size quota, low water level", + Export: true, + } + p.DeleteBufferSizeLowWaterLevel.Init(base.mgr) + + p.DeleteBufferSizeHighWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.deleteBufferSizeProtection.highWaterLevel", + Version: "2.4.11", + DefaultValue: "268435456", // 256MB + Doc: "delete buffer size quota, high water level", + Export: true, + } + p.DeleteBufferSizeHighWaterLevel.Init(base.mgr) + // limit reading p.ForceDenyReading = ParamItem{ Key: "quotaAndLimits.limitReading.forceDeny",