Skip to content

Commit

Permalink
fix: [cp24]l0RowCount metrics value always empty (#37307)
Browse files Browse the repository at this point in the history
See also: #36953
pr: #37306

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Nov 4, 2024
1 parent 4fb86eb commit 28fd217
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
23 changes: 12 additions & 11 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,17 +718,14 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID))

modPack.segments[segmentID] = &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
},
}
modPack.segments[segmentID] = NewSegmentInfo(&datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channel,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
})
modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, 0)
}
return true
Expand Down Expand Up @@ -856,6 +853,10 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb
segment.Binlogs = mergeFieldBinlogs(segment.GetBinlogs(), binlogs)
segment.Statslogs = mergeFieldBinlogs(segment.GetStatslogs(), statslogs)
segment.Deltalogs = mergeFieldBinlogs(segment.GetDeltalogs(), deltalogs)
if len(deltalogs) > 0 {
segment.deltaRowcount.Store(-1)
}

modPack.increments[segmentID] = metastore.BinlogsIncrement{
Segment: segment.SegmentInfo,
}
Expand Down
10 changes: 8 additions & 2 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -727,13 +728,15 @@ func TestUpdateSegmentsInfo(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)

segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
segment1 := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
}}
})
err = meta.AddSegment(context.TODO(), segment1)
assert.NoError(t, err)
require.EqualValues(t, -1, segment1.deltaRowcount.Load())
assert.EqualValues(t, 0, segment1.getDeltaCount())

err = meta.UpdateSegmentsInfo(
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
Expand All @@ -748,6 +751,9 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.NoError(t, err)

updated := meta.GetHealthySegment(1)
assert.EqualValues(t, -1, updated.deltaRowcount.Load())
assert.EqualValues(t, 1, updated.getDeltaCount())

expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10,
StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}},
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,9 @@ func (s *SegmentInfo) getSegmentSize() int64 {
return s.size.Load()
}

// getDeltaCount use cached value when segment is immutable
// Any edits on deltalogs of flushed segments will reset deltaRowcount to -1
func (s *SegmentInfo) getDeltaCount() int64 {
if s.deltaRowcount.Load() < 0 || s.State != commonpb.SegmentState_Flushed {
if s.deltaRowcount.Load() < 0 || s.GetState() != commonpb.SegmentState_Flushed {
var rc int64
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
Expand Down

0 comments on commit 28fd217

Please sign in to comment.