Skip to content

Commit

Permalink
fix UT failures and code check breaks
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu committed Jan 7, 2025
1 parent 948a29f commit f220fc5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 91 deletions.
34 changes: 0 additions & 34 deletions internal/flushcommon/syncmgr/options.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package syncmgr

import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func NewSyncTask() *SyncTask {
Expand All @@ -18,7 +15,6 @@ func (t *SyncTask) WithSyncPack(pack *SyncPack) *SyncTask {
t.pack = pack

// legacy code, remove later
t.bm25Binlogs = make(map[int64]*datapb.FieldBinlog)
t.collectionID = t.pack.collectionID
t.partitionID = t.pack.partitionID
t.channelName = t.pack.channelName
Expand Down Expand Up @@ -46,21 +42,6 @@ func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask {
return t
}

func (t *SyncTask) WithCheckpoint(cp *msgpb.MsgPosition) *SyncTask {
t.checkpoint = cp
return t
}

func (t *SyncTask) WithTimeRange(from, to typeutil.Timestamp) *SyncTask {
t.tsFrom, t.tsTo = from, to
return t
}

func (t *SyncTask) WithFlush() *SyncTask {
t.pack.isFlush = true
return t
}

func (t *SyncTask) WithDrop() *SyncTask {
t.pack.isDrop = true
return t
Expand All @@ -85,18 +66,3 @@ func (t *SyncTask) WithFailureCallback(callback func(error)) *SyncTask {
t.failureCallback = callback
return t
}

func (t *SyncTask) WithBatchRows(batchRows int64) *SyncTask {
t.batchRows = batchRows
return t
}

func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask {
t.level = level
return t
}

func (t *SyncTask) WithDataSource(source string) *SyncTask {
t.dataSource = source
return t
}
12 changes: 0 additions & 12 deletions internal/flushcommon/syncmgr/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,6 @@ func (s *SyncManagerSuite) TestSubmit() {
manager := NewSyncManager(s.chunkManager)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})

f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
Expand Down Expand Up @@ -158,12 +152,6 @@ func (s *SyncManagerSuite) TestCompacted() {
manager := NewSyncManager(s.chunkManager)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})

f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
Expand Down
90 changes: 46 additions & 44 deletions internal/flushcommon/syncmgr/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,58 +189,59 @@ func (s *SyncTaskSuite) TestRunNormal() {
}).Return()

s.Run("without_data", func() {
task := s.getSuiteSyncTask(new(SyncPack))
task := s.getSuiteSyncTask(new(SyncPack).WithCheckpoint(
&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})

err := task.Run(ctx)
s.NoError(err)
})

s.Run("with_insert_delete_cp", func() {
task := s.getSuiteSyncTask(
new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()}))
task.WithTimeRange(50, 100)
new(SyncPack).
WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})

err := task.Run(ctx)
s.NoError(err)
})

s.Run("with_flush", func() {
task := s.getSuiteSyncTask(
new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithFlush())
task.WithTimeRange(50, 100)
new(SyncPack).
WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).
WithFlush().
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run(ctx)
s.NoError(err)
})

s.Run("with_drop", func() {
s.metacache.EXPECT().RemoveSegments(mock.Anything, mock.Anything).Return(nil).Once()
task := s.getSuiteSyncTask(new(SyncPack).WithDeleteData(s.getDeleteBuffer()).WithDrop())
task.WithTimeRange(50, 100)
task := s.getSuiteSyncTask(new(SyncPack).
WithDeleteData(s.getDeleteBuffer()).
WithDrop().
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
err := task.Run(ctx)
s.NoError(err)
})
Expand All @@ -257,14 +258,15 @@ func (s *SyncTaskSuite) TestRunL0Segment() {
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()

s.Run("pure_delete_l0_flush", func() {
task := s.getSuiteSyncTask(new(SyncPack).WithDeleteData(s.getDeleteBuffer()).WithFlush())
task.WithTimeRange(50, 100)
task := s.getSuiteSyncTask(new(SyncPack).
WithDeleteData(s.getDeleteBuffer()).
WithFlush().
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})

err := task.Run(ctx)
s.NoError(err)
Expand Down Expand Up @@ -308,14 +310,13 @@ func (s *SyncTaskSuite) TestRunError() {
s.Run("metawrite_fail", func() {
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked"))

task := s.getSuiteSyncTask(new(SyncPack))
task := s.getSuiteSyncTask(new(SyncPack).
WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
}))
task.WithMetaWriter(BrokerMetaWriter(s.broker, 1, retry.Attempts(1)))
task.WithTimeRange(50, 100)
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})

err := task.Run(ctx)
s.Error(err)
Expand All @@ -327,8 +328,9 @@ func (s *SyncTaskSuite) TestRunError() {
s.chunkManager.ExpectedCalls = nil
s.chunkManager.EXPECT().RootPath().Return("files")
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked")))
task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})).WithFailureCallback(handler)
task.WithWriteRetryOptions(retry.Attempts(1))
task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})).
WithFailureCallback(handler).
WithWriteRetryOptions(retry.Attempts(1))

err := task.Run(ctx)

Expand Down
6 changes: 5 additions & 1 deletion internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,11 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy

metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize)

task := syncmgr.NewSyncTask().WithAllocator(wb.allocator).WithMetaCache(wb.metaCache).WithSyncPack(pack)
task := syncmgr.NewSyncTask().
WithAllocator(wb.allocator).
WithMetaWriter(wb.metaWriter).
WithMetaCache(wb.metaCache).
WithSyncPack(pack)
return task, nil
}

Expand Down

0 comments on commit f220fc5

Please sign in to comment.