Skip to content

Commit

Permalink
enhance: unify time in clustering compaction task to unix (milvus-io#…
Browse files Browse the repository at this point in the history
…35167)

milvus-io#34495

Signed-off-by: wayblink <[email protected]>
Signed-off-by: Sumit Dubey <[email protected]>
  • Loading branch information
wayblink authored and sumitd2 committed Aug 6, 2024
1 parent ff8c83c commit 8b30b67
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 26 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.UnixMilli(task.StartTime)).Seconds()
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
err := c.meta.DropCompactionTask(task)
Expand Down
16 changes: 6 additions & 10 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ func (t *clusteringCompactionTask) Process() bool {
// task state update, refresh retry times count
currentState := t.State.String()
if currentState != lastState {
ts := time.Now().UnixMilli()
ts := time.Now().Unix()
lastStateDuration := ts - t.GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration))
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration))
Observe(float64(lastStateDuration * 1000))
updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)}

if t.State == datapb.CompactionTaskState_completed {
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
updateOps = append(updateOps, setEndTime(ts))
elapse := ts - t.StartTime
log.Info("clustering compaction task total elapse", zap.Int64("elapse", elapse))
log.Info("clustering compaction task total elapse", zap.Int64("elapse seconds", elapse))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
Observe(float64(elapse))
Observe(float64(elapse * 1000))
}
err = t.updateAndSaveTaskMeta(updateOps...)
if err != nil {
Expand Down Expand Up @@ -561,10 +561,6 @@ func (t *clusteringCompactionTask) EndSpan() {
}
}

func (t *clusteringCompactionTask) SetStartTime(startTime int64) {
t.StartTime = startTime
}

func (t *clusteringCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}
Expand Down
4 changes: 0 additions & 4 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,6 @@ func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}

func (t *l0CompactionTask) SetStartTime(startTime int64) {
t.StartTime = startTime
}

func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,19 +769,19 @@ func (s *CompactionPlanHandlerSuite) TestCompactionGC() {
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_completed,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
StartTime: time.Now().Add(-time.Second * 100000).Unix(),
},
{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
StartTime: time.Now().Add(-time.Second * 100000).Unix(),
},
{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
},
}

Expand Down
10 changes: 5 additions & 5 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
TriggerID: taskID, // inner trigger, use task id as trigger id
PlanID: taskID,
Type: datapb.CompactionType_Level0DeleteCompaction,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
InputSegments: levelZeroSegs,
State: datapb.CompactionTaskState_pipelining,
Channel: view.GetGroupLabel().Channel,
Expand Down Expand Up @@ -329,7 +329,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
PlanID: taskID,
TriggerID: view.(*ClusteringSegmentsView).triggerID,
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
CollectionTtl: view.(*ClusteringSegmentsView).collectionTTL.Nanoseconds(),
TimeoutInSeconds: Params.DataCoordCfg.ClusteringCompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_ClusteringCompaction,
Expand All @@ -344,7 +344,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
PreferSegmentRows: preferSegmentRows,
TotalRows: totalRows,
AnalyzeTaskID: taskID + 1,
LastStateStartTime: time.Now().UnixMilli(),
LastStateStartTime: time.Now().Unix(),
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
Expand Down Expand Up @@ -383,7 +383,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
PlanID: taskID,
TriggerID: view.(*MixSegmentView).triggerID,
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
CollectionTtl: view.(*MixSegmentView).collectionTTL.Nanoseconds(),
TimeoutInSeconds: Params.DataCoordCfg.ClusteringCompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction, // todo: use SingleCompaction
Expand All @@ -394,7 +394,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }),
ResultSegments: []int64{taskID + 1},
TotalRows: totalRows,
LastStateStartTime: time.Now().UnixMilli(),
LastStateStartTime: time.Now().Unix(),
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand All @@ -35,7 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -72,7 +70,6 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.mockAlloc.EXPECT().Alloc(mock.Anything).RunAndReturn(func(x uint32) (int64, int64, error) {
start := s.mockID.Load()
end := s.mockID.Add(int64(x))
log.Info("wayblink", zap.Int64("start", start), zap.Int64("end", end))
return start, end, nil
}).Maybe()
s.mockAlloc.EXPECT().AllocOne().RunAndReturn(func() (int64, error) {
Expand Down

0 comments on commit 8b30b67

Please sign in to comment.