Skip to content

Commit

Permalink
enhance:[cherry-pick] Support calculate segment maxRows with diskann (#…
Browse files Browse the repository at this point in the history
…35155)

issue: #34495 

master pr: #35076

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Aug 2, 2024
1 parent f718410 commit 5f3d41d
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 3 deletions.
5 changes: 3 additions & 2 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,15 @@ func (policy *clusteringCompactionPolicy) collectionIsClusteringCompacting(colle
return false, 0
}

func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) {
func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionView, expectedSegmentSize int64) (segmentIDs []int64, totalRows, maxSegmentRows, preferSegmentRows int64, err error) {
for _, s := range view.GetSegmentsView() {
totalRows += s.NumOfRows
segmentIDs = append(segmentIDs, s.ID)
}
clusteringMaxSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()
clusteringPreferSegmentSizeRatio := paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()
maxRows, err := calBySchemaPolicy(coll.Schema)

maxRows, err := calBySegmentSizePolicy(coll.Schema, expectedSegmentSize)
if err != nil {
return nil, 0, 0, 0, err
}
Expand Down
31 changes: 30 additions & 1 deletion internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type CompactionTriggerType int8
Expand Down Expand Up @@ -304,7 +308,9 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
return
}

_, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view)
expectedSegmentSize := m.getExpectedSegmentSize(collection)

_, totalRows, maxSegmentRows, preferSegmentRows, err := calculateClusteringCompactionConfig(collection, view, expectedSegmentSize)
if err != nil {
log.Warn("Failed to calculate cluster compaction config fail", zap.String("view", view.String()), zap.Error(err))
return
Expand Down Expand Up @@ -393,6 +399,29 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
)
}

func (m *CompactionTriggerManager) getExpectedSegmentSize(collection *collectionInfo) int64 {
indexInfos := m.meta.indexMeta.GetIndexesForCollection(collection.ID, "")

vectorFields := typeutil.GetVectorFieldSchemas(collection.Schema)
fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) {
return t.FieldID, GetIndexType(t.IndexParams)
})
vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool {
if indexType, ok := fieldIndexTypes[field.FieldID]; ok {
return indexparamcheck.IsDiskIndex(indexType)
}
return false
})

allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex)
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024
}
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
}

// chanPartSegments is an internal result struct, which is aggregates of SegmentInfos with same collectionID, partitionID and channelName
type chanPartSegments struct {
collectionID UniqueID
Expand Down
171 changes: 171 additions & 0 deletions internal/datacoord/compaction_trigger_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@ package datacoord

import (
"context"
"strconv"
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

func TestCompactionTriggerManagerSuite(t *testing.T) {
Expand Down Expand Up @@ -140,3 +146,168 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() {
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(19530, nil).Maybe()
s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView)
}

func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() {
var (
collectionID = int64(1000)
fieldID = int64(2000)
indexID = int64(3000)
)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, strconv.Itoa(100))
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key)

paramtable.Get().Save(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key, strconv.Itoa(200))
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.Key)

s.triggerManager.meta = &meta{
indexMeta: &indexMeta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collectionID: {
indexID + 1: &model.Index{
CollectionID: collectionID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: []*commonpb.KeyValuePair{
{Key: common.IndexTypeKey, Value: "DISKANN"},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
indexID + 2: &model.Index{
CollectionID: collectionID,
FieldID: fieldID + 2,
IndexID: indexID + 2,
IndexName: "",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: []*commonpb.KeyValuePair{
{Key: common.IndexTypeKey, Value: "DISKANN"},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
},
}

s.Run("all DISKANN", func() {
collection := &collectionInfo{
ID: collectionID,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
Description: "",
Fields: []*schemapb.FieldSchema{
{FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
},
EnableDynamicField: false,
Properties: nil,
},
}

s.Equal(int64(200*1024*1024), s.triggerManager.getExpectedSegmentSize(collection))
})

s.Run("HNSW & DISKANN", func() {
s.triggerManager.meta = &meta{
indexMeta: &indexMeta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collectionID: {
indexID + 1: &model.Index{
CollectionID: collectionID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: []*commonpb.KeyValuePair{
{Key: common.IndexTypeKey, Value: "HNSW"},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
indexID + 2: &model.Index{
CollectionID: collectionID,
FieldID: fieldID + 2,
IndexID: indexID + 2,
IndexName: "",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: []*commonpb.KeyValuePair{
{Key: common.IndexTypeKey, Value: "DISKANN"},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
},
}
collection := &collectionInfo{
ID: collectionID,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
Description: "",
Fields: []*schemapb.FieldSchema{
{FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
},
EnableDynamicField: false,
Properties: nil,
},
}

s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection))
})

s.Run("some vector has no index", func() {
s.triggerManager.meta = &meta{
indexMeta: &indexMeta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
collectionID: {
indexID + 1: &model.Index{
CollectionID: collectionID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: "",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: []*commonpb.KeyValuePair{
{Key: common.IndexTypeKey, Value: "HNSW"},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
},
}
collection := &collectionInfo{
ID: collectionID,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
Description: "",
Fields: []*schemapb.FieldSchema{
{FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: fieldID + 2, Name: "field2", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
},
EnableDynamicField: false,
Properties: nil,
},
}

s.Equal(int64(100*1024*1024), s.triggerManager.getExpectedSegmentSize(collection))
})
}
15 changes: 15 additions & 0 deletions internal/datacoord/segment_allocation_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ func calBySchemaPolicyWithDiskIndex(schema *schemapb.CollectionSchema) (int, err
return int(threshold / float64(sizePerRecord)), nil
}

func calBySegmentSizePolicy(schema *schemapb.CollectionSchema, segmentSize int64) (int, error) {
if schema == nil {
return -1, errors.New("nil schema")
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
return -1, err
}
// check zero value, preventing panicking
if sizePerRecord == 0 {
return -1, errors.New("zero size record schema found")
}
return int(segmentSize) / sizePerRecord, nil
}

// AllocatePolicy helper function definition to allocate Segment space
type AllocatePolicy func(segments []*SegmentInfo, count int64,
maxCountPerL1Segment int64, level datapb.SegmentLevel) ([]*Allocation, []*Allocation)
Expand Down
52 changes: 52 additions & 0 deletions internal/datacoord/segment_allocation_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,58 @@ func TestGetChannelOpenSegCapacityPolicy(t *testing.T) {
}
}

func TestCalBySegmentSizePolicy(t *testing.T) {
t.Run("nil schema", func(t *testing.T) {
rows, err := calBySegmentSizePolicy(nil, 1024)

assert.Error(t, err)
assert.Equal(t, -1, rows)
})

t.Run("get dim failed", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Name: "coll1",
Description: "",
Fields: []*schemapb.FieldSchema{
{FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "fake"}}},
},
EnableDynamicField: false,
Properties: nil,
}

rows, err := calBySegmentSizePolicy(schema, 1024)
assert.Error(t, err)
assert.Equal(t, -1, rows)
})

t.Run("sizePerRecord is zero", func(t *testing.T) {
schema := &schemapb.CollectionSchema{Fields: nil}
rows, err := calBySegmentSizePolicy(schema, 1024)

assert.Error(t, err)
assert.Equal(t, -1, rows)
})

t.Run("normal case", func(t *testing.T) {
schema := &schemapb.CollectionSchema{
Name: "coll1",
Description: "",
Fields: []*schemapb.FieldSchema{
{FieldID: fieldID, Name: "field0", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{FieldID: fieldID + 1, Name: "field1", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
},
EnableDynamicField: false,
Properties: nil,
}

rows, err := calBySegmentSizePolicy(schema, 1200)
assert.NoError(t, err)
// 1200/(4*8+8)
assert.Equal(t, 30, rows)
})
}

func TestSortSegmentsByLastExpires(t *testing.T) {
segs := make([]*SegmentInfo, 0, 10)
for i := 0; i < 10; i++ {
Expand Down

0 comments on commit 5f3d41d

Please sign in to comment.