Skip to content

Commit

Permalink
fix: Use the correct RootPath when decompressing binlog in stats task (
Browse files Browse the repository at this point in the history
…milvus-io#38341)

issue: milvus-io#38336

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 11, 2024
1 parent d3ae8e9 commit 0d7a89a
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 47 deletions.
14 changes: 7 additions & 7 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (c *importChecker) checkImportingJob(job ImportJob) {

func (c *importChecker) checkStatsJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
updateJobState := func(state internalpb.ImportJobState) {
err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(state))
updateJobState := func(state internalpb.ImportJobState, reason string) {
err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(state), UpdateJobReason(reason))
if err != nil {
log.Warn("failed to update job state", zap.Error(err))
return
Expand All @@ -290,7 +290,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) {

// Skip stats stage if not enable stats or is l0 import.
if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() || importutilv2.IsL0Import(job.GetOptions()) {
updateJobState(internalpb.ImportJobState_IndexBuilding)
updateJobState(internalpb.ImportJobState_IndexBuilding, "")
return
}

Expand All @@ -306,8 +306,8 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
taskCnt += len(originSegmentIDs)
for i, originSegmentID := range originSegmentIDs {
taskLogFields := WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))
state := c.sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort)
switch state {
t := c.sjm.GetStatsTask(originSegmentID, indexpb.StatsSubJob_Sort)
switch t.GetState() {
case indexpb.JobState_JobStateNone:
err := c.sjm.SubmitStatsTask(originSegmentID, statsSegmentIDs[i], indexpb.StatsSubJob_Sort, false)
if err != nil {
Expand All @@ -319,7 +319,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
log.Debug("waiting for stats task...", taskLogFields...)
case indexpb.JobState_JobStateFailed:
log.Warn("import job stats failed", taskLogFields...)
updateJobState(internalpb.ImportJobState_Failed)
updateJobState(internalpb.ImportJobState_Failed, t.GetFailReason())
return
case indexpb.JobState_JobStateFinished:
doneCnt++
Expand All @@ -329,7 +329,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) {

// All segments are stats-ed. Update job state to `IndexBuilding`.
if taskCnt == doneCnt {
updateJobState(internalpb.ImportJobState_IndexBuilding)
updateJobState(internalpb.ImportJobState_IndexBuilding, "")
}
}

Expand Down
12 changes: 9 additions & 3 deletions internal/datacoord/import_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,22 @@ func (s *ImportCheckerSuite) TestCheckJob() {
alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil).Maybe()
sjm := s.checker.sjm.(*MockStatsJobManager)
sjm.EXPECT().SubmitStatsTask(mock.Anything, mock.Anything, mock.Anything, false).Return(nil)
sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateNone)
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{
State: indexpb.JobState_JobStateNone,
})
s.checker.checkStatsJob(job)
s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
sjm = NewMockStatsJobManager(s.T())
sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateInProgress)
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{
State: indexpb.JobState_JobStateInProgress,
})
s.checker.sjm = sjm
s.checker.checkStatsJob(job)
s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
sjm = NewMockStatsJobManager(s.T())
sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateFinished)
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{
State: indexpb.JobState_JobStateFinished,
})
s.checker.sjm = sjm
s.checker.checkStatsJob(job)
s.Equal(internalpb.ImportJobState_IndexBuilding, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ func getStatsProgress(jobID int64, imeta ImportMeta, sjm StatsJobManager) float3
}
doneCnt := 0
for _, originSegmentID := range originSegmentIDs {
state := sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort)
if state == indexpb.JobState_JobStateFinished {
t := sjm.GetStatsTask(originSegmentID, indexpb.StatsSubJob_Sort)
if t.GetState() == indexpb.JobState_JobStateFinished {
doneCnt++
}
}
Expand Down
14 changes: 10 additions & 4 deletions internal/datacoord/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats))
assert.NoError(t, err)
sjm := NewMockStatsJobManager(t)
sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, _ indexpb.StatsSubJob) indexpb.JobState {
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, _ indexpb.StatsSubJob) *indexpb.StatsTask {
if lo.Contains([]int64{10, 11, 12}, segmentID) {
return indexpb.JobState_JobStateFinished
return &indexpb.StatsTask{
State: indexpb.JobState_JobStateFinished,
}
}
return &indexpb.StatsTask{
State: indexpb.JobState_JobStateInProgress,
}
return indexpb.JobState_JobStateInProgress
})
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm)
assert.Equal(t, int64(10+30+30+10*0.5), progress)
Expand All @@ -635,7 +639,9 @@ func TestImportUtil_GetImportProgress(t *testing.T) {

// stats state, len(statsSegmentIDs) / (len(originalSegmentIDs) = 1
sjm = NewMockStatsJobManager(t)
sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateFinished)
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{
State: indexpb.JobState_JobStateFinished,
})
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm)
assert.Equal(t, int64(10+30+30+10), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
Expand Down
11 changes: 6 additions & 5 deletions internal/datacoord/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type StatsJobManager interface {
Start()
Stop()
SubmitStatsTask(originSegmentID, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool) error
GetStatsTaskState(originSegmentID int64, subJobType indexpb.StatsSubJob) indexpb.JobState
GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask
DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error
}

Expand Down Expand Up @@ -264,11 +264,12 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6
return nil
}

func (jm *statsJobManager) GetStatsTaskState(originSegmentID int64, subJobType indexpb.StatsSubJob) indexpb.JobState {
state := jm.mt.statsTaskMeta.GetStatsTaskStateBySegmentID(originSegmentID, subJobType)
func (jm *statsJobManager) GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask {
task := jm.mt.statsTaskMeta.GetStatsTaskBySegmentID(originSegmentID, subJobType)
log.Info("statsJobManager get stats task state", zap.Int64("segmentID", originSegmentID),
zap.String("subJobType", subJobType.String()), zap.String("state", state.String()))
return state
zap.String("subJobType", subJobType.String()), zap.String("state", task.GetState().String()),
zap.String("failReason", task.GetFailReason()))
return task
}

func (jm *statsJobManager) DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error {
Expand Down
30 changes: 16 additions & 14 deletions internal/datacoord/mock_job_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions internal/datacoord/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/indexnode/task_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ func (st *statsTask) PreExecute(ctx context.Context) error {
zap.Int64("segmentID", st.req.GetSegmentID()),
)

if err := binlog.DecompressBinLog(storage.InsertBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(),
if err := binlog.DecompressBinLogWithRootPath(st.req.GetStorageConfig().GetRootPath(), storage.InsertBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(),
st.req.GetSegmentID(), st.req.GetInsertLogs()); err != nil {
log.Warn("Decompress insert binlog error", zap.Error(err))
return err
}

if err := binlog.DecompressBinLog(storage.DeleteBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(),
if err := binlog.DecompressBinLogWithRootPath(st.req.GetStorageConfig().GetRootPath(), storage.DeleteBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(),
st.req.GetSegmentID(), st.req.GetDeltaLogs()); err != nil {
log.Warn("Decompress delta binlog error", zap.Error(err))
return err
Expand Down
22 changes: 22 additions & 0 deletions internal/metastore/kv/binlog/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,34 @@ func DecompressBinLog(binlogType storage.BinlogType, collectionID, partitionID,
return nil
}

func DecompressBinLogWithRootPath(rootPath string, binlogType storage.BinlogType, collectionID, partitionID,
segmentID typeutil.UniqueID, fieldBinlogs []*datapb.FieldBinlog,
) error {
for _, fieldBinlog := range fieldBinlogs {
for _, binlog := range fieldBinlog.Binlogs {
if binlog.GetLogPath() == "" {
path, err := BuildLogPathWithRootPath(rootPath, binlogType, collectionID, partitionID,
segmentID, fieldBinlog.GetFieldID(), binlog.GetLogID())
if err != nil {
return err
}
binlog.LogPath = path
}
}
}
return nil
}

// build a binlog path on the storage by metadata
func BuildLogPath(binlogType storage.BinlogType, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) (string, error) {
chunkManagerRootPath := paramtable.Get().MinioCfg.RootPath.GetValue()
if paramtable.Get().CommonCfg.StorageType.GetValue() == "local" {
chunkManagerRootPath = paramtable.Get().LocalStorageCfg.Path.GetValue()
}
return BuildLogPathWithRootPath(chunkManagerRootPath, binlogType, collectionID, partitionID, segmentID, fieldID, logID)
}

func BuildLogPathWithRootPath(chunkManagerRootPath string, binlogType storage.BinlogType, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) (string, error) {
switch binlogType {
case storage.InsertBinlog:
return metautil.BuildInsertLogPath(chunkManagerRootPath, collectionID, partitionID, segmentID, fieldID, logID), nil
Expand Down

0 comments on commit 0d7a89a

Please sign in to comment.