From 8c687d6b46d1ac2b5137f837493678f02e218a6a Mon Sep 17 00:00:00 2001 From: narro wizard Date: Tue, 22 Oct 2024 10:45:54 +0800 Subject: [PATCH] feat: not update sub task progress if progress less than 1 pct [Refactor][core]Data inflation when using postgres #8142 --- backend/core/runner/run_task.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/backend/core/runner/run_task.go b/backend/core/runner/run_task.go index 98b913f284b..9a57d21c8d3 100644 --- a/backend/core/runner/run_task.go +++ b/backend/core/runner/run_task.go @@ -20,10 +20,11 @@ package runner import ( gocontext "context" "fmt" - "github.com/apache/incubator-devlake/core/models/common" "strings" "time" + "github.com/apache/incubator-devlake/core/models/common" + "github.com/apache/incubator-devlake/core/context" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -354,6 +355,7 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta Model: common.Model{ID: taskId}, } subtask := &models.Subtask{} + originalFinishedRecords := progressDetail.FinishedRecords switch p.Type { case plugin.TaskSetProgress: progressDetail.TotalSubTasks = p.Total @@ -373,14 +375,22 @@ func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDeta case plugin.SetCurrentSubTask: progressDetail.SubTaskName = p.SubTaskName progressDetail.SubTaskNumber = p.SubTaskNumber + // reset finished records + progressDetail.FinishedRecords = 0 } - // update subtask progress - where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) - err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ - {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, - }, where) - if err != nil { - basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + currentFinishedRecords := progressDetail.FinishedRecords + currentTotalRecords := progressDetail.TotalRecords + // update progress if progress is more than 1% + // or there is progress if no total record provided + if (currentTotalRecords > 0 && float64(currentFinishedRecords-originalFinishedRecords)/float64(currentTotalRecords) > 0.01) || (currentTotalRecords <= 0 && currentFinishedRecords > originalFinishedRecords) { + // update subtask progress + where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName) + err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{ + {ColumnName: "finished_records", Value: progressDetail.FinishedRecords}, + }, where) + if err != nil { + basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress") + } } }