diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 5d7f03a1a726b..12b8c1856a81b 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -194,14 +194,15 @@ func (policy *clusteringCompactionPolicy) collectionIsClusteringCompacting(colle return false, 0 } -func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) { +func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView, expectedSegmentSize int64) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) { for _, s := range view.GetSegmentsView() { totalRows += s.NumOfRows segmentIDs = append(segmentIDs, s.ID) } clusteringMaxSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat() clusteringPreferSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat() - maxRows, err := calBySchemaPolicy(coll.Schema) + + maxRows, err := calBySegmentSizePolicy(coll.Schema, expectedSegmentSize) if err != nil { return nil, 0, 0, 0, err } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 4c5c92242c2b8..96b1e8bd88d73 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -24,10 +24,14 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type CompactionTriggerType int8 @@ -304,7 +308,9 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C return } - _, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view) + expectedSegmentSize := m.getExpectedSegmentSize(collection) + + _, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view, expectedSegmentSize) if err != nil { log.Warn("Failed to calculate cluster compaction config fail", zap.String("view", view.String()), zap.Error(err)) return @@ -393,6 +399,29 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte ) } +func (m *CompactionTriggerManager) getExpectedSegmentSize(collection *collectionInfo) int64 { + indexInfos := m.meta.indexMeta.GetIndexesForCollection(collection.ID, "") + + vectorFields := typeutil.GetVectorFieldSchemas(collection.Schema) + fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { + return t.FieldID, GetIndexType(t.IndexParams) + }) + vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { + if indexType, ok := fieldIndexTypes[field.FieldID]; ok { + return indexparamcheck.IsDiskIndex(indexType) + } + return false + }) + + allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) + if allDiskIndex { + // Only if all vector fields index type are DiskANN, recalc segment max size here. + return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 + } + // If some vector fields index type are not DiskANN, recalc segment max size using default policy. + return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 +} + // chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName type chanPartSegments struct { collectionID UniqueID diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index dabd84b6219e5..017e743930715 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -2,6 +2,7 @@ package datacoord import ( "context" + "strconv" "testing" "github.com/samber/lo" @@ -9,8 +10,13 @@ import ( "github.com/stretchr/testify/suite" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestCompactionTriggerManagerSuite(t *testing.T) { @@ -140,3 +146,168 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe() s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView) } + +func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { + var ( + collectionID = int64(1000) + fieldID = int64(2000) + indexID = int64(3000) + ) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, strconv.Itoa(100)) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key, strconv.Itoa(200)) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key) + + s.triggerManager.meta = &meta{ + indexMeta: &indexMeta{ + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID + 1: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "DISKANN"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 2: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 2, + IndexID: indexID + 2, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "DISKANN"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + } + + s.Run("all DISKANN", func() { + collection := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + }, + } + + s.Equal(int64(200*1024*1024), s.triggerManager.getExpectedSegmentSize(collection)) + }) + + s.Run("HNSW & DISKANN", func() { + s.triggerManager.meta = &meta{ + indexMeta: &indexMeta{ + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID + 1: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "HNSW"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 2: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 2, + IndexID: indexID + 2, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "DISKANN"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + } + collection := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + }, + } + + s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection)) + }) + + s.Run("some vector has no index", func() { + s.triggerManager.meta = &meta{ + indexMeta: &indexMeta{ + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID + 1: &model.Index{ + CollectionID: collectionID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: "", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + {Key: common.IndexTypeKey, Value: "HNSW"}, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + } + collection := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + {FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + }, + } + + s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection)) + }) +} diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 2c4f6bc1778e9..769b918271b4a 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -66,6 +66,21 @@ func calBySchemaPolicyWithDiskIndex(schema *schemapb.CollectionSchema) (int, err return int(threshold / float64(sizePerRecord)), nil } +func calBySegmentSizePolicy(schema *schemapb.CollectionSchema, segmentSize int64) (int, error) { + if schema == nil { + return -1, errors.New("nil schema") + } + sizePerRecord, err := typeutil.EstimateSizePerRecord(schema) + if err != nil { + return -1, err + } + // check zero value, preventing panicking + if sizePerRecord == 0 { + return -1, errors.New("zero size record schema found") + } + return int(segmentSize) / sizePerRecord, nil +} + // AllocatePolicy helper function definition to allocate Segment space type AllocatePolicy func(segments []*SegmentInfo, count int64, maxCountPerL1Segment int64, level datapb.SegmentLevel) ([]*Allocation, []*Allocation) diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index fc6f77ddc83e9..fa803b227ec2c 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -140,6 +140,58 @@ func TestGetChannelOpenSegCapacityPolicy(t *testing.T) { } } +func TestCalBySegmentSizePolicy(t *testing.T) { + t.Run("nil schema", func(t *testing.T) { + rows, err := calBySegmentSizePolicy(nil, 1024) + + assert.Error(t, err) + assert.Equal(t, -1, rows) + }) + + t.Run("get dim failed", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "fake"}}}, + }, + EnableDynamicField: false, + Properties: nil, + } + + rows, err := calBySegmentSizePolicy(schema, 1024) + assert.Error(t, err) + assert.Equal(t, -1, rows) + }) + + t.Run("sizePerRecord is zero", func(t *testing.T) { + schema := &schemapb.CollectionSchema{Fields: nil} + rows, err := calBySegmentSizePolicy(schema, 1024) + + assert.Error(t, err) + assert.Equal(t, -1, rows) + }) + + t.Run("normal case", func(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Name: "coll1", + Description: "", + Fields: []*schemapb.FieldSchema{ + {FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}}, + }, + EnableDynamicField: false, + Properties: nil, + } + + rows, err := calBySegmentSizePolicy(schema, 1200) + assert.NoError(t, err) + // 1200/(4*8+8) + assert.Equal(t, 30, rows) + }) +} + func TestSortSegmentsByLastExpires(t *testing.T) { segs := make([]*SegmentInfo, 0, 10) for i := 0; i < 10; i++ {