Skip to content

Commit

Permalink
Fix clustering compaction task leak
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Oct 12, 2024
1 parent 1d9c746 commit f158228
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 0 deletions.
12 changes: 12 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (t *clusteringCompactionTask) processExecuting() error {
}

func (t *clusteringCompactionTask) processMetaSaved() error {
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_indexing))
}

Expand Down Expand Up @@ -363,6 +368,13 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))

if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}

// revert segments meta
var operators []UpdateOperator
// revert level of input segments
Expand Down
6 changes: 6 additions & 0 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
},
})
s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)

task := s.generateBasicTask(false)

Expand Down Expand Up @@ -370,6 +371,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
})
Expand Down Expand Up @@ -403,6 +405,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
// DropCompactionPlan fail
s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(merr.WrapErrNodeNotFound(1)).Once()
s.Equal(false, task.Process())
s.Equal(datapb.CompactionTaskState_indexing, task.GetState())
})
Expand Down Expand Up @@ -438,6 +442,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()

time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
Expand Down
2 changes: 2 additions & 0 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()).
Observe(float64(t.tr.ElapseSpan().Milliseconds()))
log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load()))
// clear the buffer cache
t.keyToBufferFunc = nil

return planResult, nil
}
Expand Down

0 comments on commit f158228

Please sign in to comment.