Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: Set an empty segment if compaction deleted all inserts (milvus-i…
Browse files Browse the repository at this point in the history
…o#36044)

See also: milvus-io#36038

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
XuanYang-cn authored and chyezh committed Sep 11, 2024
1 parent 2e85384 commit b955ae8
Showing 2 changed files with 31 additions and 8 deletions.
18 changes: 11 additions & 7 deletions internal/datanode/compaction/mix_compactor_test.go
Original file line number Diff line number Diff line change
@@ -270,7 +270,12 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {

compactionSegments, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, nil)
s.NoError(err)
s.Equal(0, len(compactionSegments))
s.Equal(1, len(compactionSegments))
s.EqualValues(0, compactionSegments[0].GetNumOfRows())
s.EqualValues(19531, compactionSegments[0].GetSegmentID())
s.Empty(compactionSegments[0].GetDeltalogs())
s.Empty(compactionSegments[0].GetInsertLogs())
s.Empty(compactionSegments[0].GetField2StatslogPaths())
}

func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
@@ -280,10 +285,11 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
description string
deletions map[interface{}]uint64
expectedRes int
leftNumRows int
}{
{"no deletion", nil, 1},
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1},
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 0},
{"no deletion", nil, 1, 1},
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1, 1},
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 1, 0},
}

alloc := allocator.NewLocalAllocator(888888, math.MaxInt64)
@@ -304,9 +310,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
res, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, test.deletions)
s.NoError(err)
s.EqualValues(test.expectedRes, len(res))
if test.expectedRes > 0 {
s.EqualValues(1, res[0].GetNumOfRows())
}
s.EqualValues(test.leftNumRows, res[0].GetNumOfRows())
})
}
}
21 changes: 20 additions & 1 deletion internal/datanode/compaction/segment_writer.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ type MultiSegmentWriter struct {
// segID -> fieldID -> binlogs

res []*datapb.CompactionSegment
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
}

type compactionAlloactor struct {
@@ -195,9 +196,27 @@ func (w *MultiSegmentWriter) Write(v *storage.Value) error {
return writer.Write(v)
}

// Could return an empty list if every insert of the segment is deleted
func (w *MultiSegmentWriter) appendEmptySegment() error {
writer, err := w.getWriter()
if err != nil {
return err
}

w.res = append(w.res, &datapb.CompactionSegment{
SegmentID: writer.GetSegmentID(),
NumOfRows: 0,
Channel: w.channel,
})
return nil
}

// DONOT return an empty list if every insert of the segment is deleted,
// append an empty segment instead
func (w *MultiSegmentWriter) Finish() ([]*datapb.CompactionSegment, error) {
if w.current == -1 {
if err := w.appendEmptySegment(); err != nil {
return nil, err
}
return w.res, nil
}

0 comments on commit b955ae8

Please sign in to comment.