Skip to content

Commit

Permalink
enhance: Add l0 segment entry num quota (milvus-io#34733)
Browse files Browse the repository at this point in the history
See also milvus-io#34670

This PR add quota configuration for l0 segment entry number per
collection. If l0 compaction cannot keep up the insertion/upsertion
rate, this feature could back press the related rate.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jul 17, 2024
1 parent aa5418a commit 67324eb
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 27 deletions.
3 changes: 2 additions & 1 deletion internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error)
}

err := merr.WrapErrServiceQuotaExceeded("disk quota exceeded, please allocate more resources")
totalUsage, collectionsUsage, _ := meta.GetCollectionBinlogSize()
quotaInfo := meta.GetQuotaInfo()
totalUsage, collectionsUsage := quotaInfo.TotalBinlogSize, quotaInfo.CollectionBinlogSize

tasks := imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(PreImportTaskType))
files := make([]*datapb.ImportFileStats, 0)
Expand Down
20 changes: 17 additions & 3 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -383,13 +384,16 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
return m.getNumRowsOfCollectionUnsafe(collectionID)
}

// GetCollectionBinlogSize returns the total binlog size and binlog size of collections.
func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueID]map[UniqueID]int64) {
func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
info := &metricsinfo.DataCoordQuotaMetrics{}
m.RLock()
defer m.RUnlock()
collectionBinlogSize := make(map[UniqueID]int64)
partitionBinlogSize := make(map[UniqueID]map[UniqueID]int64)
collectionRowsNum := make(map[UniqueID]map[commonpb.SegmentState]int64)
// collection id => l0 delta entry count
collectionL0RowCounts := make(map[UniqueID]int64)

segments := m.segments.GetSegments()
var total int64
for _, segment := range segments {
Expand Down Expand Up @@ -417,6 +421,10 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueI
collectionRowsNum[segment.GetCollectionID()] = make(map[commonpb.SegmentState]int64)
}
collectionRowsNum[segment.GetCollectionID()][segment.GetState()] += segment.GetNumOfRows()

if segment.GetLevel() == datapb.SegmentLevel_L0 {
collectionL0RowCounts[segment.GetCollectionID()] += segment.getDeltaCount()
}
}
}

Expand All @@ -429,7 +437,13 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueI
}
}
}
return total, collectionBinlogSize, partitionBinlogSize

info.TotalBinlogSize = total
info.CollectionBinlogSize = collectionBinlogSize
info.PartitionsBinlogSize = partitionBinlogSize
info.CollectionL0RowCount = collectionL0RowCounts

return info
}

// GetCollectionIndexFilesSize returns the total index files size of all segment for each collection.
Expand Down
16 changes: 8 additions & 8 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,16 +589,16 @@ func TestMeta_Basic(t *testing.T) {
assert.NoError(t, err)

// check TotalBinlogSize
total, collectionBinlogSize, _ := meta.GetCollectionBinlogSize()
assert.Len(t, collectionBinlogSize, 1)
assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID])
assert.Equal(t, int64(size0+size1), total)
quotaInfo := meta.GetQuotaInfo()
assert.Len(t, quotaInfo.CollectionBinlogSize, 1)
assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID])
assert.Equal(t, int64(size0+size1), quotaInfo.TotalBinlogSize)

meta.collections[collID] = collInfo
total, collectionBinlogSize, _ = meta.GetCollectionBinlogSize()
assert.Len(t, collectionBinlogSize, 1)
assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID])
assert.Equal(t, int64(size0+size1), total)
quotaInfo = meta.GetQuotaInfo()
assert.Len(t, quotaInfo.CollectionBinlogSize, 1)
assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID])
assert.Equal(t, int64(size0+size1), quotaInfo.TotalBinlogSize)
})

t.Run("Test GetCollectionBinlogSize", func(t *testing.T) {
Expand Down
8 changes: 2 additions & 6 deletions internal/datacoord/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,10 @@ import (

// getQuotaMetrics returns DataCoordQuotaMetrics.
func (s *Server) getQuotaMetrics() *metricsinfo.DataCoordQuotaMetrics {
total, colSizes, partSizes := s.meta.GetCollectionBinlogSize()
info := s.meta.GetQuotaInfo()
// Just generate the metrics data regularly
_ = s.meta.GetCollectionIndexFilesSize()
return &metricsinfo.DataCoordQuotaMetrics{
TotalBinlogSize: total,
CollectionBinlogSize: colSizes,
PartitionsBinlogSize: partSizes,
}
return info
}

func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoordCollectionMetrics {
Expand Down
34 changes: 28 additions & 6 deletions internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type SegmentInfo struct {
isCompacting bool
// a cache to avoid calculate twice
size atomic.Int64
deltaRowcount atomic.Int64
lastWrittenTime time.Time
}

Expand All @@ -61,14 +62,20 @@ type SegmentInfo struct {
// Note that the allocation information is not preserved,
// the worst case scenario is to have a segment with twice size we expects
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
return &SegmentInfo{
SegmentInfo: info,
currRows: info.GetNumOfRows(),
allocations: make([]*Allocation, 0, 16),
lastFlushTime: time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)),
s := &SegmentInfo{
SegmentInfo: info,
currRows: info.GetNumOfRows(),
}
// setup growing fields
if s.GetState() == commonpb.SegmentState_Growing {
s.allocations = make([]*Allocation, 0, 16)
s.lastFlushTime = time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second))
// A growing segment from recovery can be also considered idle.
lastWrittenTime: getZeroTime(),
s.lastWrittenTime = getZeroTime()
}
// mark as uninitialized
s.deltaRowcount.Store(-1)
return s
}

// NewSegmentsInfo creates a `SegmentsInfo` instance, which makes sure internal map is initialized
Expand Down Expand Up @@ -330,6 +337,7 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
lastWrittenTime: s.lastWrittenTime,
}
cloned.size.Store(s.size.Load())
cloned.deltaRowcount.Store(s.deltaRowcount.Load())

for _, opt := range opts {
opt(cloned)
Expand Down Expand Up @@ -492,5 +500,19 @@ func (s *SegmentInfo) getSegmentSize() int64 {
return s.size.Load()
}

func (s *SegmentInfo) getDeltaCount() int64 {
if s.deltaRowcount.Load() < 0 {
var rc int64
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
rc += l.GetEntriesNum()
}
}
s.deltaRowcount.Store(rc)
}
r := s.deltaRowcount.Load()
return r
}

// SegmentInfoSelector is the function type to select SegmentInfo from meta
type SegmentInfoSelector func(*SegmentInfo) bool
29 changes: 26 additions & 3 deletions internal/rootcoord/quota_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,14 @@ func getRateTypes(scope internalpb.RateScope, opType opType) typeutil.Set[intern

func (q *QuotaCenter) Start() {
q.wg.Add(1)
go q.run()
go func() {
defer q.wg.Done()
q.run()
}()
}

// run starts the service of QuotaCenter.
func (q *QuotaCenter) run() {
defer q.wg.Done()

interval := Params.QuotaConfig.QuotaCenterCollectInterval.GetAsDuration(time.Second)
log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval))
ticker := time.NewTicker(interval)
Expand Down Expand Up @@ -957,6 +958,8 @@ func (q *QuotaCenter) calculateWriteRates() error {
updateCollectionFactor(memFactors)
growingSegFactors := q.getGrowingSegmentsSizeFactor()
updateCollectionFactor(growingSegFactors)
l0Factors := q.getL0SegmentsSizeFactor()
updateCollectionFactor(l0Factors)

ttCollections := make([]int64, 0)
memoryCollections := make([]int64, 0)
Expand Down Expand Up @@ -1214,6 +1217,26 @@ func (q *QuotaCenter) getGrowingSegmentsSizeFactor() map[int64]float64 {
return collectionFactor
}

// getL0SegmentsSizeFactor checks wether any collection
func (q *QuotaCenter) getL0SegmentsSizeFactor() map[int64]float64 {
if !Params.QuotaConfig.L0SegmentRowCountProtectionEnabled.GetAsBool() {
return nil
}

l0segmentSizeLowWaterLevel := Params.QuotaConfig.L0SegmentRowCountLowWaterLevel.GetAsInt64()
l0SegmentSizeHighWaterLevel := Params.QuotaConfig.L0SegmentRowCountHighWaterLevel.GetAsInt64()

collectionFactor := make(map[int64]float64)
for collectionID, l0RowCount := range q.dataCoordMetrics.CollectionL0RowCount {
if l0RowCount < l0segmentSizeLowWaterLevel {
continue
}
factor := float64(l0SegmentSizeHighWaterLevel-l0RowCount) / float64(l0SegmentSizeHighWaterLevel-l0segmentSizeLowWaterLevel)
collectionFactor[collectionID] = factor
}
return collectionFactor
}

// calculateRates calculates target rates by different strategies.
func (q *QuotaCenter) calculateRates() error {
err := q.resetAllCurrentRates()
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/metricsinfo/quota_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type DataCoordQuotaMetrics struct {
TotalBinlogSize int64
CollectionBinlogSize map[int64]int64
PartitionsBinlogSize map[int64]map[int64]int64
// l0 segments
CollectionL0RowCount map[int64]int64
}

// DataNodeQuotaMetrics are metrics of DataNode.
Expand Down
30 changes: 30 additions & 0 deletions pkg/util/paramtable/quota_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type quotaConfig struct {
DiskQuotaPerDB ParamItem `refreshable:"true"`
DiskQuotaPerCollection ParamItem `refreshable:"true"`
DiskQuotaPerPartition ParamItem `refreshable:"true"`
L0SegmentRowCountProtectionEnabled ParamItem `refreshable:"true"`
L0SegmentRowCountLowWaterLevel ParamItem `refreshable:"true"`
L0SegmentRowCountHighWaterLevel ParamItem `refreshable:"true"`

// limit reading
ForceDenyReading ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -1856,6 +1859,33 @@ but the rate will not be lower than minRateRatio * dmlRate.`,
}
p.DiskQuotaPerPartition.Init(base.mgr)

p.L0SegmentRowCountProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitWriting.l0SegmentsRowCountProtection.enabled",
Version: "2.4.7",
DefaultValue: "false",
Doc: "switch to enable l0 segment row count quota",
Export: true,
}
p.L0SegmentRowCountProtectionEnabled.Init(base.mgr)

p.L0SegmentRowCountLowWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.l0SegmentsRowCountProtection.lowWaterLevel",
Version: "2.4.7",
DefaultValue: "32768",
Doc: "l0 segment row count quota, low water level",
Export: true,
}
p.L0SegmentRowCountLowWaterLevel.Init(base.mgr)

p.L0SegmentRowCountHighWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.l0SegmentsRowCountProtection.highWaterLevel",
Version: "2.4.7",
DefaultValue: "65536",
Doc: "l0 segment row count quota, low water level",
Export: true,
}
p.L0SegmentRowCountHighWaterLevel.Init(base.mgr)

// limit reading
p.ForceDenyReading = ParamItem{
Key: "quotaAndLimits.limitReading.forceDeny",
Expand Down

0 comments on commit 67324eb

Please sign in to comment.