From 266326fe1cc3b547824ffc449698401de65a226d Mon Sep 17 00:00:00 2001 From: yangxuan Date: Fri, 1 Nov 2024 20:22:34 +0800 Subject: [PATCH] fix store deltaRowcount Signed-off-by: yangxuan --- internal/datacoord/meta.go | 4 ++++ internal/datacoord/meta_test.go | 10 ++++++++-- internal/datacoord/segment_info.go | 5 ++--- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index c8a7af987c3f2..7f048bdd04808 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -879,6 +879,10 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs segment.Binlogs = mergeFieldBinlogs(segment.GetBinlogs(), binlogs) segment.Statslogs = mergeFieldBinlogs(segment.GetStatslogs(), statslogs) segment.Deltalogs = mergeFieldBinlogs(segment.GetDeltalogs(), deltalogs) + if len(deltalogs) > 0 { + segment.deltaRowcount.Store(-1) + } + segment.Bm25Statslogs = mergeFieldBinlogs(segment.GetBm25Statslogs(), bm25logs) modPack.increments[segmentID] = metastore.BinlogsIncrement{ Segment: segment.SegmentInfo, diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 8492173123b7b..13876fbb34f21 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "google.golang.org/protobuf/proto" @@ -661,13 +662,15 @@ func TestUpdateSegmentsInfo(t *testing.T) { meta, err := newMemoryMeta() assert.NoError(t, err) - segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + segment1 := NewSegmentInfo(&datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, - }} + }) err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) + require.EqualValues(t, -1, segment1.deltaRowcount.Load()) + assert.EqualValues(t, 0, segment1.getDeltaCount()) err = meta.UpdateSegmentsInfo( UpdateStatusOperator(1, commonpb.SegmentState_Flushing), @@ -683,6 +686,9 @@ func TestUpdateSegmentsInfo(t *testing.T) { assert.NoError(t, err) updated := meta.GetHealthySegment(1) + assert.EqualValues(t, -1, updated.deltaRowcount.Load()) + assert.EqualValues(t, 1, updated.getDeltaCount()) + expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}, diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 11d1eb357131f..1cee29f59bbbd 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -508,10 +508,9 @@ func (s *SegmentInfo) getSegmentSize() int64 { return s.size.Load() } -// L1 segment deltaCount changes in any state -// L0 segment deltaCount won't change +// Any edits on deltalogs of flushed segments will reset deltaRowcount to -1 func (s *SegmentInfo) getDeltaCount() int64 { - if s.deltaRowcount.Load() < 0 || s.GetLevel() != datapb.SegmentLevel_L0 { + if s.deltaRowcount.Load() < 0 || s.GetState() != commonpb.SegmentState_Flushed { var rc int64 for _, deltaLogs := range s.GetDeltalogs() { for _, l := range deltaLogs.GetBinlogs() {