Skip to content

Commit

Permalink
fix: Fix empty import task result (milvus-io#38316)
Browse files Browse the repository at this point in the history
Ensure the idempotency of import tasks to prevent duplicate tasks in
DataNode.

issue: milvus-io#38313

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 11, 2024
1 parent 37a5228 commit 43e0e2b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
7 changes: 4 additions & 3 deletions internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, m
defer cancel()
for size > 0 {
segmentInfo, err := AllocImportSegment(ctx, alloc, meta,
task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
task.GetJobID(), task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
if err != nil {
return err
}
Expand All @@ -180,8 +180,8 @@ func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, m
func AllocImportSegment(ctx context.Context,
alloc allocator.Allocator,
meta *meta,
taskID int64, collectionID UniqueID,
partitionID UniqueID,
jobID int64, taskID int64,
collectionID UniqueID, partitionID UniqueID,
channelName string,
level datapb.SegmentLevel,
) (*SegmentInfo, error) {
Expand Down Expand Up @@ -221,6 +221,7 @@ func AllocImportSegment(ctx context.Context,
return nil, err
}
log.Info("add import segment done",
zap.Int64("jobID", jobID),
zap.Int64("taskID", taskID),
zap.Int64("collectionID", segmentInfo.CollectionID),
zap.Int64("segmentID", segmentInfo.ID),
Expand Down
6 changes: 6 additions & 0 deletions internal/datanode/importv2/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package importv2

import (
"sync"

"github.com/milvus-io/milvus/pkg/log"
)

type TaskManager interface {
Expand All @@ -42,6 +44,10 @@ func NewTaskManager() TaskManager {
func (m *taskManager) Add(task Task) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.tasks[task.GetTaskID()]; ok {
log.Warn("duplicated task", WrapLogFields(task)...)
return
}
m.tasks[task.GetTaskID()] = task
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datanode/importv2/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ func TestImportManager(t *testing.T) {
assert.Equal(t, 1, len(tasks))
assert.Equal(t, task2.GetTaskID(), tasks[0].GetTaskID())

// check idempotency
manager.Add(task2)
tasks = manager.GetBy(WithStates(datapb.ImportTaskStateV2_Completed))
assert.Equal(t, 1, len(tasks))
assert.Equal(t, task2.GetTaskID(), tasks[0].GetTaskID())
assert.True(t, task2 == tasks[0])

manager.Update(task1.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed))
task := manager.Get(task1.GetTaskID())
assert.Equal(t, datapb.ImportTaskStateV2_Failed, task.GetState())
Expand Down

0 comments on commit 43e0e2b

Please sign in to comment.