From 27ba322d399db66aff64bec9e2c33953c94e0c1a Mon Sep 17 00:00:00 2001 From: Klesh Wong Date: Thu, 8 Feb 2024 19:34:13 +0800 Subject: [PATCH] fix: rerun pipeline deadlock (#6939) --- backend/server/api/task/task.go | 7 ++++--- backend/server/services/pipeline.go | 2 +- backend/server/services/task.go | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/backend/server/api/task/task.go b/backend/server/api/task/task.go index 07fe7b8d458..bb00fd8c678 100644 --- a/backend/server/api/task/task.go +++ b/backend/server/api/task/task.go @@ -18,12 +18,13 @@ limitations under the License. package task import ( + "net/http" + "strconv" + "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/models" "github.com/apache/incubator-devlake/server/api/shared" "github.com/apache/incubator-devlake/server/services" - "net/http" - "strconv" "github.com/gin-gonic/gin" ) @@ -63,7 +64,7 @@ func GetTaskByPipeline(c *gin.Context) { shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format")) return } - tasks, err := services.GetTasksWithLastStatus(pipelineId) + tasks, err := services.GetTasksWithLastStatus(pipelineId, nil) if err != nil { shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks")) return diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index c8c9191d517..afa080ec01b 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -371,7 +371,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, } failedTasks = append(failedTasks, task) } else { - tasks, err := GetTasksWithLastStatus(pipelineId) + tasks, err := GetTasksWithLastStatus(pipelineId, tx) if err != nil { return nil, errors.Default.Wrap(err, "error getting tasks") } diff --git a/backend/server/services/task.go b/backend/server/services/task.go index eaf10120d42..3624145575d 100644 --- a/backend/server/services/task.go +++ b/backend/server/services/task.go @@ -110,9 +110,9 @@ func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) { // GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned // TODO: adopts GetLatestTasksOfPipeline -func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) { +func GetTasksWithLastStatus(pipelineId uint64, tx dal.Dal) ([]*models.Task, errors.Error) { var tasks []*models.Task - err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC")) + err := tx.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC")) if err != nil { return nil, err }