Skip to content

Commit

Permalink
enhance: use different value to get related data size according to se…
Browse files Browse the repository at this point in the history
…gment type (#33017)

issue: #30436

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored May 14, 2024
1 parent 4fc7915 commit 1d48d0a
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 48 deletions.
2 changes: 1 addition & 1 deletion internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ message SegmentLoadInfo {
repeated data.FieldBinlog deltalogs = 9;
repeated int64 compactionFrom = 10; // segmentIDs compacted from
repeated FieldIndexInfo index_infos = 11;
int64 segment_size = 12;
int64 segment_size = 12 [deprecated = true];
string insert_channel = 13;
msg.MsgPosition start_position = 14;
msg.MsgPosition delta_position = 15;
Expand Down
43 changes: 0 additions & 43 deletions internal/querycoordv2/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,52 +86,9 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M
Level: segment.GetLevel(),
StorageVersion: segment.GetStorageVersion(),
}
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)
return loadInfo
}

func calculateSegmentSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
segmentSize := int64(0)

fieldIndex := make(map[int64]*querypb.FieldIndexInfo)
for _, index := range segmentLoadInfo.IndexInfos {
if index.EnableIndex {
fieldID := index.FieldID
fieldIndex[fieldID] = index
}
}

for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
if index, ok := fieldIndex[fieldID]; ok {
segmentSize += index.IndexSize
} else {
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
}
}

// Get size of state data
for _, fieldBinlog := range segmentLoadInfo.Statslogs {
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
}

// Get size of delete data
for _, fieldBinlog := range segmentLoadInfo.Deltalogs {
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
}

return segmentSize
}

func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
fieldSize := int64(0)
for _, binlog := range fieldBinlog.Binlogs {
fieldSize += binlog.LogSize
}

return fieldSize
}

func MergeDmChannelInfo(infos []*datapb.VchannelInfo) *meta.DmChannel {
var dmChannel *meta.DmChannel

Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy
Ids: result.GetIds(),
FieldsData: result.GetFieldsData(),
CostAggregation: &internalpb.CostAggregation{
TotalRelatedDataSize: segment.MemSize(),
TotalRelatedDataSize: GetSegmentRelatedDataSize(segment),
},
AllRetrieveCount: result.GetAllRetrieveCount(),
}); err != nil {
Expand Down
38 changes: 38 additions & 0 deletions internal/querynodev2/segments/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -206,3 +208,39 @@ func withLazyLoadTimeoutContext(ctx context.Context) (context.Context, context.C
// TODO: use context.WithTimeoutCause instead of contextutil.WithTimeoutCause in go1.21
return contextutil.WithTimeoutCause(ctx, lazyLoadTimeout, errLazyLoadTimeout)
}

func GetSegmentRelatedDataSize(segment Segment) int64 {
if segment.Type() == SegmentTypeSealed {
return calculateSegmentLogSize(segment.LoadInfo())
}
return segment.MemSize()
}

func calculateSegmentLogSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
segmentSize := int64(0)

for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
}

// Get size of state data
for _, fieldBinlog := range segmentLoadInfo.Statslogs {
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
}

// Get size of delete data
for _, fieldBinlog := range segmentLoadInfo.Deltalogs {
segmentSize += getFieldSizeFromFieldBinlog(fieldBinlog)
}

return segmentSize
}

func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
fieldSize := int64(0)
for _, binlog := range fieldBinlog.Binlogs {
fieldSize += binlog.LogSize
}

return fieldSize
}
57 changes: 57 additions & 0 deletions internal/querynodev2/segments/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
)

func TestFilterZeroValuesFromSlice(t *testing.T) {
Expand All @@ -18,3 +21,57 @@ func TestFilterZeroValuesFromSlice(t *testing.T) {
assert.Equal(t, 3, len(filteredInts))
assert.EqualValues(t, []int64{10, 5, 13}, filteredInts)
}

func TestGetSegmentRelatedDataSize(t *testing.T) {
t.Run("seal segment", func(t *testing.T) {
segment := NewMockSegment(t)
segment.EXPECT().Type().Return(SegmentTypeSealed)
segment.EXPECT().LoadInfo().Return(&querypb.SegmentLoadInfo{
BinlogPaths: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
LogSize: 10,
},
{
LogSize: 20,
},
},
},
{
Binlogs: []*datapb.Binlog{
{
LogSize: 30,
},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
LogSize: 30,
},
},
},
},
Statslogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
LogSize: 10,
},
},
},
},
})
assert.EqualValues(t, 100, GetSegmentRelatedDataSize(segment))
})

t.Run("growing segment", func(t *testing.T) {
segment := NewMockSegment(t)
segment.EXPECT().Type().Return(SegmentTypeGrowing)
segment.EXPECT().MemSize().Return(int64(100))
assert.EqualValues(t, 100, GetSegmentRelatedDataSize(segment))
})
}
2 changes: 1 addition & 1 deletion internal/querynodev2/tasks/query_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (t *QueryTask) Execute() error {
}

relatedDataSize := lo.Reduce(querySegments, func(acc int64, seg segments.Segment, _ int) int64 {
return acc + seg.MemSize()
return acc + segments.GetSegmentRelatedDataSize(seg)
}, 0)

t.result = &internalpb.RetrieveResults{
Expand Down
12 changes: 10 additions & 2 deletions internal/querynodev2/tasks/search_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (t *SearchTask) Execute() error {
}

relatedDataSize := lo.Reduce(searchedSegments, func(acc int64, seg segments.Segment, _ int) int64 {
return acc + seg.MemSize()
return acc + segments.GetSegmentRelatedDataSize(seg)
}, 0)

tr.RecordSpan()
Expand Down Expand Up @@ -445,6 +445,7 @@ func (t *StreamingSearchTask) Execute() error {

// 1. search&&reduce or streaming-search&&streaming-reduce
metricType := searchReq.Plan().GetMetricType()
var relatedDataSize int64
if req.GetScope() == querypb.DataScope_Historical {
streamReduceFunc := func(result *segments.SearchResult) error {
reduceErr := t.streamReduce(t.ctx, searchReq.Plan(), result, t.originNqs, t.originTopks)
Expand All @@ -470,6 +471,9 @@ func (t *StreamingSearchTask) Execute() error {
log.Error("Failed to get stream-reduced search result")
return err
}
relatedDataSize = lo.Reduce(pinnedSegments, func(acc int64, seg segments.Segment, _ int) int64 {
return acc + segments.GetSegmentRelatedDataSize(seg)
}, 0)
} else if req.GetScope() == querypb.DataScope_Streaming {
results, pinnedSegments, err := segments.SearchStreaming(
t.ctx,
Expand Down Expand Up @@ -507,6 +511,9 @@ func (t *StreamingSearchTask) Execute() error {
metrics.ReduceSegments,
metrics.BatchReduce).
Observe(float64(tr.RecordSpan().Milliseconds()))
relatedDataSize = lo.Reduce(pinnedSegments, func(acc int64, seg segments.Segment, _ int) int64 {
return acc + segments.GetSegmentRelatedDataSize(seg)
}, 0)
}

// 2. reorganize blobs to original search request
Expand Down Expand Up @@ -539,7 +546,8 @@ func (t *StreamingSearchTask) Execute() error {
SlicedOffset: 1,
SlicedNumCount: 1,
CostAggregation: &internalpb.CostAggregation{
ServiceTime: tr.ElapseSpan().Milliseconds(),
ServiceTime: tr.ElapseSpan().Milliseconds(),
TotalRelatedDataSize: relatedDataSize,
},
}
}
Expand Down

0 comments on commit 1d48d0a

Please sign in to comment.