diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index bfc30392f5296..3533c5981424b 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -284,7 +284,8 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error) } err := merr.WrapErrServiceQuotaExceeded("disk quota exceeded, please allocate more resources") - totalUsage, collectionsUsage, _ := meta.GetCollectionBinlogSize() + quotaInfo := meta.GetQuotaInfo() + totalUsage, collectionsUsage := quotaInfo.TotalBinlogSize, quotaInfo.CollectionBinlogSize tasks := imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(PreImportTaskType)) files := make([]*datapb.ImportFileStats, 0) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index dfb110eb2e798..352cf1d7fb6c7 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -383,13 +384,16 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 { return m.getNumRowsOfCollectionUnsafe(collectionID) } -// GetCollectionBinlogSize returns the total binlog size and binlog size of collections. -func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueID]map[UniqueID]int64) { +func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { + info := &metricsinfo.DataCoordQuotaMetrics{} m.RLock() defer m.RUnlock() collectionBinlogSize := make(map[UniqueID]int64) partitionBinlogSize := make(map[UniqueID]map[UniqueID]int64) collectionRowsNum := make(map[UniqueID]map[commonpb.SegmentState]int64) + // collection id => l0 delta entry count + collectionL0RowCounts := make(map[UniqueID]int64) + segments := m.segments.GetSegments() var total int64 for _, segment := range segments { @@ -417,6 +421,10 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueI collectionRowsNum[segment.GetCollectionID()] = make(map[commonpb.SegmentState]int64) } collectionRowsNum[segment.GetCollectionID()][segment.GetState()] += segment.GetNumOfRows() + + if segment.GetLevel() == datapb.SegmentLevel_L0 { + collectionL0RowCounts[segment.GetCollectionID()] += segment.getDeltaCount() + } } } @@ -429,7 +437,13 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64, map[UniqueI } } } - return total, collectionBinlogSize, partitionBinlogSize + + info.TotalBinlogSize = total + info.CollectionBinlogSize = collectionBinlogSize + info.PartitionsBinlogSize = partitionBinlogSize + info.CollectionL0RowCount = collectionL0RowCounts + + return info } // GetCollectionIndexFilesSize returns the total index files size of all segment for each collection. diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 94593313d1586..d78540f838e6c 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -589,16 +589,16 @@ func TestMeta_Basic(t *testing.T) { assert.NoError(t, err) // check TotalBinlogSize - total, collectionBinlogSize, _ := meta.GetCollectionBinlogSize() - assert.Len(t, collectionBinlogSize, 1) - assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID]) - assert.Equal(t, int64(size0+size1), total) + quotaInfo := meta.GetQuotaInfo() + assert.Len(t, quotaInfo.CollectionBinlogSize, 1) + assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID]) + assert.Equal(t, int64(size0+size1), quotaInfo.TotalBinlogSize) meta.collections[collID] = collInfo - total, collectionBinlogSize, _ = meta.GetCollectionBinlogSize() - assert.Len(t, collectionBinlogSize, 1) - assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID]) - assert.Equal(t, int64(size0+size1), total) + quotaInfo = meta.GetQuotaInfo() + assert.Len(t, quotaInfo.CollectionBinlogSize, 1) + assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID]) + assert.Equal(t, int64(size0+size1), quotaInfo.TotalBinlogSize) }) t.Run("Test GetCollectionBinlogSize", func(t *testing.T) { diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 19e516cf84721..b29c00bee7716 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -37,14 +37,10 @@ import ( // getQuotaMetrics returns DataCoordQuotaMetrics. func (s *Server) getQuotaMetrics() *metricsinfo.DataCoordQuotaMetrics { - total, colSizes, partSizes := s.meta.GetCollectionBinlogSize() + info := s.meta.GetQuotaInfo() // Just generate the metrics data regularly _ = s.meta.GetCollectionIndexFilesSize() - return &metricsinfo.DataCoordQuotaMetrics{ - TotalBinlogSize: total, - CollectionBinlogSize: colSizes, - PartitionsBinlogSize: partSizes, - } + return info } func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoordCollectionMetrics { diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 8dcd183632b87..5ed9b8a42b4c5 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -53,6 +53,7 @@ type SegmentInfo struct { isCompacting bool // a cache to avoid calculate twice size atomic.Int64 + deltaRowcount atomic.Int64 lastWrittenTime time.Time } @@ -61,14 +62,20 @@ type SegmentInfo struct { // Note that the allocation information is not preserved, // the worst case scenario is to have a segment with twice size we expects func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { - return &SegmentInfo{ - SegmentInfo: info, - currRows: info.GetNumOfRows(), - allocations: make([]*Allocation, 0, 16), - lastFlushTime: time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)), + s := &SegmentInfo{ + SegmentInfo: info, + currRows: info.GetNumOfRows(), + } + // setup growing fields + if s.GetState() == commonpb.SegmentState_Growing { + s.allocations = make([]*Allocation, 0, 16) + s.lastFlushTime = time.Now().Add(-1 * paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second)) // A growing segment from recovery can be also considered idle. - lastWrittenTime: getZeroTime(), + s.lastWrittenTime = getZeroTime() } + // mark as uninitialized + s.deltaRowcount.Store(-1) + return s } // NewSegmentsInfo creates a `SegmentsInfo` instance, which makes sure internal map is initialized @@ -330,6 +337,7 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo { lastWrittenTime: s.lastWrittenTime, } cloned.size.Store(s.size.Load()) + cloned.deltaRowcount.Store(s.deltaRowcount.Load()) for _, opt := range opts { opt(cloned) @@ -492,5 +500,19 @@ func (s *SegmentInfo) getSegmentSize() int64 { return s.size.Load() } +func (s *SegmentInfo) getDeltaCount() int64 { + if s.deltaRowcount.Load() < 0 { + var rc int64 + for _, deltaLogs := range s.GetDeltalogs() { + for _, l := range deltaLogs.GetBinlogs() { + rc += l.GetEntriesNum() + } + } + s.deltaRowcount.Store(rc) + } + r := s.deltaRowcount.Load() + return r +} + // SegmentInfoSelector is the function type to select SegmentInfo from meta type SegmentInfoSelector func(*SegmentInfo) bool diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index b263cb4a07eb8..a1ca015476053 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -270,13 +270,14 @@ func getRateTypes(scope internalpb.RateScope, opType opType) typeutil.Set[intern func (q *QuotaCenter) Start() { q.wg.Add(1) - go q.run() + go func() { + defer q.wg.Done() + q.run() + }() } // run starts the service of QuotaCenter. func (q *QuotaCenter) run() { - defer q.wg.Done() - interval := Params.QuotaConfig.QuotaCenterCollectInterval.GetAsDuration(time.Second) log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval)) ticker := time.NewTicker(interval) @@ -957,6 +958,8 @@ func (q *QuotaCenter) calculateWriteRates() error { updateCollectionFactor(memFactors) growingSegFactors := q.getGrowingSegmentsSizeFactor() updateCollectionFactor(growingSegFactors) + l0Factors := q.getL0SegmentsSizeFactor() + updateCollectionFactor(l0Factors) ttCollections := make([]int64, 0) memoryCollections := make([]int64, 0) @@ -1214,6 +1217,26 @@ func (q *QuotaCenter) getGrowingSegmentsSizeFactor() map[int64]float64 { return collectionFactor } +// getL0SegmentsSizeFactor checks wether any collection +func (q *QuotaCenter) getL0SegmentsSizeFactor() map[int64]float64 { + if !Params.QuotaConfig.L0SegmentRowCountProtectionEnabled.GetAsBool() { + return nil + } + + l0segmentSizeLowWaterLevel := Params.QuotaConfig.L0SegmentRowCountLowWaterLevel.GetAsInt64() + l0SegmentSizeHighWaterLevel := Params.QuotaConfig.L0SegmentRowCountHighWaterLevel.GetAsInt64() + + collectionFactor := make(map[int64]float64) + for collectionID, l0RowCount := range q.dataCoordMetrics.CollectionL0RowCount { + if l0RowCount < l0segmentSizeLowWaterLevel { + continue + } + factor := float64(l0SegmentSizeHighWaterLevel-l0RowCount) / float64(l0SegmentSizeHighWaterLevel-l0segmentSizeLowWaterLevel) + collectionFactor[collectionID] = factor + } + return collectionFactor +} + // calculateRates calculates target rates by different strategies. func (q *QuotaCenter) calculateRates() error { err := q.resetAllCurrentRates() diff --git a/pkg/util/metricsinfo/quota_metric.go b/pkg/util/metricsinfo/quota_metric.go index 44f609c29f4f6..290da1d473f62 100644 --- a/pkg/util/metricsinfo/quota_metric.go +++ b/pkg/util/metricsinfo/quota_metric.go @@ -88,6 +88,8 @@ type DataCoordQuotaMetrics struct { TotalBinlogSize int64 CollectionBinlogSize map[int64]int64 PartitionsBinlogSize map[int64]map[int64]int64 + // l0 segments + CollectionL0RowCount map[int64]int64 } // DataNodeQuotaMetrics are metrics of DataNode. diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index b19fd71b3b45c..d7a276514af67 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -152,6 +152,9 @@ type quotaConfig struct { DiskQuotaPerDB ParamItem `refreshable:"true"` DiskQuotaPerCollection ParamItem `refreshable:"true"` DiskQuotaPerPartition ParamItem `refreshable:"true"` + L0SegmentRowCountProtectionEnabled ParamItem `refreshable:"true"` + L0SegmentRowCountLowWaterLevel ParamItem `refreshable:"true"` + L0SegmentRowCountHighWaterLevel ParamItem `refreshable:"true"` // limit reading ForceDenyReading ParamItem `refreshable:"true"` @@ -1856,6 +1859,33 @@ but the rate will not be lower than minRateRatio * dmlRate.`, } p.DiskQuotaPerPartition.Init(base.mgr) + p.L0SegmentRowCountProtectionEnabled = ParamItem{ + Key: "quotaAndLimits.limitWriting.l0SegmentsRowCountProtection.enabled", + Version: "2.4.7", + DefaultValue: "false", + Doc: "switch to enable l0 segment row count quota", + Export: true, + } + p.L0SegmentRowCountProtectionEnabled.Init(base.mgr) + + p.L0SegmentRowCountLowWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.l0SegmentsRowCountProtection.lowWaterLevel", + Version: "2.4.7", + DefaultValue: "32768", + Doc: "l0 segment row count quota, low water level", + Export: true, + } + p.L0SegmentRowCountLowWaterLevel.Init(base.mgr) + + p.L0SegmentRowCountHighWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.l0SegmentsRowCountProtection.highWaterLevel", + Version: "2.4.7", + DefaultValue: "65536", + Doc: "l0 segment row count quota, low water level", + Export: true, + } + p.L0SegmentRowCountHighWaterLevel.Init(base.mgr) + // limit reading p.ForceDenyReading = ParamItem{ Key: "quotaAndLimits.limitReading.forceDeny",