Skip to content

Commit

Permalink
enhance: Add delete buffer related quota logic (#35918)
Browse files Browse the repository at this point in the history
See also #35303

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Sep 5, 2024
1 parent 80a9efd commit 8593c45
Show file tree
Hide file tree
Showing 14 changed files with 344 additions and 32 deletions.
8 changes: 8 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,14 @@ quotaAndLimits:
enabled: false # switch to enable l0 segment row count quota
lowWaterLevel: 32768 # l0 segment row count quota, low water level
highWaterLevel: 65536 # l0 segment row count quota, low water level
deleteBufferRowCountProtection:
enabled: false # switch to enable delete buffer row count quota
lowWaterLevel: 32768 # delete buffer row count quota, low water level
highWaterLevel: 65536 # delete buffer row count quota, high water level
deleteBufferSizeProtection:
enabled: false # switch to enable delete buffer size quota
lowWaterLevel: 134217728 # delete buffer size quota, low water level
highWaterLevel: 268435456 # delete buffer size quota, high water level
limitReading:
# forceDeny false means dql requests are allowed (except for some
# specific conditions, such as collection has been dropped), true means always reject all dql requests.
Expand Down
5 changes: 5 additions & 0 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type ShardDelegator interface {
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
GetTargetVersion() int64
GetDeleteBufferSize() (entryNum int64, memorySize int64)

// manage exclude segments
AddExcludedSegments(excludeInfo map[int64]uint64)
Expand Down Expand Up @@ -588,6 +589,10 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta
return results, nil
}

func (sd *shardDelegator) GetDeleteBufferSize() (entryNum int64, memorySize int64) {
return sd.deleteBuffer.Size()
}

type subTask[T any] struct {
req T
targetID int64
Expand Down
58 changes: 47 additions & 11 deletions internal/querynodev2/delegator/deletebuffer/delete_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var errBufferFull = errors.New("buffer full")
type timed interface {
Timestamp() uint64
Size() int64
EntryNum() int64
}

// DeleteBuffer is the interface for delete buffer.
Expand All @@ -36,6 +37,8 @@ type DeleteBuffer[T timed] interface {
ListAfter(uint64) []T
SafeTs() uint64
TryDiscard(uint64)
// Size returns current size information of delete buffer: entryNum and memory
Size() (entryNum, memorySize int64)
}

func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
Expand Down Expand Up @@ -86,31 +89,59 @@ func (c *doubleCacheBuffer[T]) ListAfter(ts uint64) []T {
return result
}

func (c *doubleCacheBuffer[T]) Size() (entryNum int64, memorySize int64) {
c.mut.RLock()
defer c.mut.RUnlock()

if c.head != nil {
blockNum, blockSize := c.head.Size()
entryNum += blockNum
memorySize += blockSize
}

if c.tail != nil {
blockNum, blockSize := c.tail.Size()
entryNum += blockNum
memorySize += blockSize
}

return entryNum, memorySize
}

// evict sets head as tail and evicts tail.
func (c *doubleCacheBuffer[T]) evict(newTs uint64, entry T) {
c.tail = c.head
c.head = &cacheBlock[T]{
headTs: newTs,
maxSize: c.maxSize / 2,
size: entry.Size(),
data: []T{entry},
headTs: newTs,
maxSize: c.maxSize / 2,
size: entry.Size(),
entryNum: entry.EntryNum(),
data: []T{entry},
}
c.ts = c.tail.headTs
}

func newCacheBlock[T timed](ts uint64, maxSize int64, elements ...T) *cacheBlock[T] {
var entryNum, memorySize int64
for _, element := range elements {
entryNum += element.EntryNum()
memorySize += element.Size()
}
return &cacheBlock[T]{
headTs: ts,
maxSize: maxSize,
data: elements,
headTs: ts,
maxSize: maxSize,
data: elements,
entryNum: entryNum,
size: memorySize,
}
}

type cacheBlock[T timed] struct {
mut sync.RWMutex
headTs uint64
size int64
maxSize int64
mut sync.RWMutex
headTs uint64
entryNum int64
size int64
maxSize int64

data []T
}
Expand All @@ -127,6 +158,7 @@ func (c *cacheBlock[T]) Put(entry T) error {

c.data = append(c.data, entry)
c.size += entry.Size()
c.entryNum += entry.EntryNum()
return nil
}

Expand All @@ -143,3 +175,7 @@ func (c *cacheBlock[T]) ListAfter(ts uint64) []T {
}
return c.data[idx:]
}

func (c *cacheBlock[T]) Size() (entryNum, memorySize int64) {
return c.entryNum, c.size
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (s *DoubleCacheBufferSuite) TestPut() {

s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(1, len(buffer.ListAfter(12)))
entryNum, memorySize := buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(304, memorySize)

buffer.Put(&Item{
Ts: 13,
Expand All @@ -128,6 +131,9 @@ func (s *DoubleCacheBufferSuite) TestPut() {
s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(2, len(buffer.ListAfter(12)))
s.Equal(1, len(buffer.ListAfter(13)))
entryNum, memorySize = buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(304, memorySize)
}

func TestDoubleCacheDeleteBuffer(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions internal/querynodev2/delegator/deletebuffer/delete_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func (item *Item) Size() int64 {
}, int64(0))
}

func (item *Item) EntryNum() int64 {
return lo.Reduce(item.Data, func(entryNum int64, item BufferItem, _ int) int64 {
return entryNum + item.EntryNum()
}, int64(0))
}

type BufferItem struct {
PartitionID int64
DeleteData storage.DeleteData
Expand All @@ -37,3 +43,7 @@ func (item *BufferItem) Size() int64 {

return int64(96) + pkSize + int64(8*len(item.DeleteData.Tss))
}

func (item *BufferItem) EntryNum() int64 {
return int64(len(item.DeleteData.Pks))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ func TestDeleteBufferItem(t *testing.T) {
}

assert.Equal(t, int64(96), item.Size())
assert.EqualValues(t, 0, item.EntryNum())

item.DeleteData.Pks = []storage.PrimaryKey{
storage.NewInt64PrimaryKey(10),
}
item.DeleteData.Tss = []uint64{2000}
assert.Equal(t, int64(120), item.Size())
assert.EqualValues(t, 1, item.EntryNum())
}
12 changes: 12 additions & 0 deletions internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,15 @@ func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
b.list = b.list[nextHead:]
}
}

func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) {
b.mut.RLock()
defer b.mut.RUnlock()

for _, block := range b.list {
blockNum, blockSize := block.Size()
entryNum += blockNum
memorySize += blockSize
}
return entryNum, memorySize
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (s *ListDeleteBufferSuite) TestCache() {

s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(1, len(buffer.ListAfter(12)))
entryNum, memorySize := buffer.Size()
s.EqualValues(0, entryNum)
s.EqualValues(192, memorySize)
}

func (s *ListDeleteBufferSuite) TestTryDiscard() {
Expand Down Expand Up @@ -95,18 +98,32 @@ func (s *ListDeleteBufferSuite) TestTryDiscard() {
})

s.Equal(2, len(buffer.ListAfter(10)))
entryNum, memorySize := buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(240, memorySize)

buffer.TryDiscard(10)
s.Equal(2, len(buffer.ListAfter(10)), "equal ts shall not discard block")
entryNum, memorySize = buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(240, memorySize)

buffer.TryDiscard(9)
s.Equal(2, len(buffer.ListAfter(10)), "history ts shall not discard any block")
entryNum, memorySize = buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(240, memorySize)

buffer.TryDiscard(20)
s.Equal(1, len(buffer.ListAfter(10)), "first block shall be discarded")
entryNum, memorySize = buffer.Size()
s.EqualValues(1, entryNum)
s.EqualValues(120, memorySize)

buffer.TryDiscard(20)
s.Equal(1, len(buffer.ListAfter(10)), "discard will not happen if there is only one block")
s.EqualValues(1, entryNum)
s.EqualValues(120, memorySize)
}

func TestListDeleteBuffer(t *testing.T) {
Expand Down
51 changes: 51 additions & 0 deletions internal/querynodev2/delegator/mock_delegator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions internal/querynodev2/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/hardware"
Expand Down Expand Up @@ -125,6 +126,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
).Set(float64(numEntities))
}

deleteBufferNum := make(map[int64]int64)
deleteBufferSize := make(map[int64]int64)

node.delegators.Range(func(_ string, sd delegator.ShardDelegator) bool {
collectionID := sd.Collection()
entryNum, memorySize := sd.GetDeleteBufferSize()
deleteBufferNum[collectionID] += entryNum
deleteBufferSize[collectionID] += memorySize
return true
})

return &metricsinfo.QueryNodeQuotaMetrics{
Hms: metricsinfo.HardwareMetrics{},
Rms: rms,
Expand All @@ -138,6 +150,10 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
NodeID: node.GetNodeID(),
CollectionIDs: collections,
},
DeleteBufferInfo: metricsinfo.DeleteBufferInfo{
CollectionDeleteBufferNum: deleteBufferNum,
CollectionDeleteBufferSize: deleteBufferSize,
},
}, nil
}

Expand Down
25 changes: 24 additions & 1 deletion internal/querynodev2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1749,9 +1749,32 @@ func (suite *ServiceSuite) TestGetMetric_Normal() {
Request: string(mReq),
}

sd1 := delegator.NewMockShardDelegator(suite.T())
sd1.EXPECT().Collection().Return(100)
sd1.EXPECT().GetDeleteBufferSize().Return(10, 1000)
sd1.EXPECT().Close().Maybe()
suite.node.delegators.Insert("qn_unitest_dml_0_100v0", sd1)

sd2 := delegator.NewMockShardDelegator(suite.T())
sd2.EXPECT().Collection().Return(100)
sd2.EXPECT().GetDeleteBufferSize().Return(10, 1000)
sd2.EXPECT().Close().Maybe()
suite.node.delegators.Insert("qn_unitest_dml_1_100v1", sd2)

resp, err := suite.node.GetMetrics(ctx, req)
err = merr.CheckRPCCall(resp, err)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())

info := &metricsinfo.QueryNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(resp.GetResponse(), info)
suite.NoError(err)

entryNum, ok := info.QuotaMetrics.DeleteBufferInfo.CollectionDeleteBufferNum[100]
suite.True(ok)
suite.EqualValues(20, entryNum)
memorySize, ok := info.QuotaMetrics.DeleteBufferInfo.CollectionDeleteBufferSize[100]
suite.True(ok)
suite.EqualValues(2000, memorySize)
}

func (suite *ServiceSuite) TestGetMetric_Failed() {
Expand Down
Loading

0 comments on commit 8593c45

Please sign in to comment.