diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index dec075915c11d..7f048bdd04808 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -727,17 +727,14 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID)) - modPack.segments[segmentID] = &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: segmentID, - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannel: channel, - NumOfRows: 0, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L0, - }, - } + modPack.segments[segmentID] = NewSegmentInfo(&datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + }) modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, false, 0) } return true @@ -882,6 +879,10 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs segment.Binlogs = mergeFieldBinlogs(segment.GetBinlogs(), binlogs) segment.Statslogs = mergeFieldBinlogs(segment.GetStatslogs(), statslogs) segment.Deltalogs = mergeFieldBinlogs(segment.GetDeltalogs(), deltalogs) + if len(deltalogs) > 0 { + segment.deltaRowcount.Store(-1) + } + segment.Bm25Statslogs = mergeFieldBinlogs(segment.GetBm25Statslogs(), bm25logs) modPack.increments[segmentID] = metastore.BinlogsIncrement{ Segment: segment.SegmentInfo, diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 8492173123b7b..13876fbb34f21 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "google.golang.org/protobuf/proto" @@ -661,13 +662,15 @@ func TestUpdateSegmentsInfo(t *testing.T) { meta, err := newMemoryMeta() assert.NoError(t, err) - segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + segment1 := NewSegmentInfo(&datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, - }} + }) err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) + require.EqualValues(t, -1, segment1.deltaRowcount.Load()) + assert.EqualValues(t, 0, segment1.getDeltaCount()) err = meta.UpdateSegmentsInfo( UpdateStatusOperator(1, commonpb.SegmentState_Flushing), @@ -683,6 +686,9 @@ func TestUpdateSegmentsInfo(t *testing.T) { assert.NoError(t, err) updated := meta.GetHealthySegment(1) + assert.EqualValues(t, -1, updated.deltaRowcount.Load()) + assert.EqualValues(t, 1, updated.getDeltaCount()) + expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}, diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index a9ac2dc671a7c..1cee29f59bbbd 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -508,9 +508,9 @@ func (s *SegmentInfo) getSegmentSize() int64 { return s.size.Load() } -// getDeltaCount use cached value when segment is immutable +// Any edits on deltalogs of flushed segments will reset deltaRowcount to -1 func (s *SegmentInfo) getDeltaCount() int64 { - if s.deltaRowcount.Load() < 0 || s.State != commonpb.SegmentState_Flushed { + if s.deltaRowcount.Load() < 0 || s.GetState() != commonpb.SegmentState_Flushed { var rc int64 for _, deltaLogs := range s.GetDeltalogs() { for _, l := range deltaLogs.GetBinlogs() {