Skip to content

Commit

Permalink
fix: triggering blueprint concurrently might lead to deadlock (#6902)
Browse files Browse the repository at this point in the history
  • Loading branch information
klesh authored Feb 1, 2024
1 parent 1fc831b commit 9a9a5e5
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
10 changes: 6 additions & 4 deletions backend/server/services/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task,
txHelper := dbhelper.NewTxHelper(basicRes, &err)
tx := txHelper.Begin()
defer txHelper.End()
err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}})
err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{
{Table: "_devlake_pipelines", Exclusive: true},
{Table: "_devlake_tasks", Exclusive: true},
})
if err != nil {
err = errors.BadInput.Wrap(err, "failed to lock pipeline table, is there any pending pipeline or deletion?")
return
Expand Down Expand Up @@ -385,7 +388,6 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task,
}

// create new tasks
// TODO: this is better to be wrapped inside a transaction
rerunTasks := []*models.Task{}
for _, t := range failedTasks {
// mark previous task failed
Expand All @@ -395,7 +397,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task,
return nil, err
}
// create new task
rerunTask, err := CreateTask(&models.NewTask{
rerunTask, err := createTask(&models.NewTask{
PipelineTask: &models.PipelineTask{
Plugin: t.Plugin,
Subtasks: t.Subtasks,
Expand All @@ -405,7 +407,7 @@ func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task,
PipelineRow: t.PipelineRow,
PipelineCol: t.PipelineCol,
IsRerun: true,
})
}, tx)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion backend/server/services/pipeline_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin
dal.LockTables{
{Table: "_devlake_pipelines", Exclusive: true},
{Table: "_devlake_pipeline_labels", Exclusive: true},
{Table: "_devlake_tasks", Exclusive: true},
},
))
if err != nil {
Expand Down Expand Up @@ -91,7 +92,7 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (pipeline *models.Pipelin
PipelineRow: i + 1,
PipelineCol: j + 1,
}
_ = errors.Must1(CreateTask(newTask))
_ = errors.Must1(createTask(newTask, tx))
// sync task state back to pipeline
dbPipeline.TotalTasks += 1
}
Expand Down
5 changes: 2 additions & 3 deletions backend/server/services/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ type TaskQuery struct {
Pending int `form:"pending"`
}

// CreateTask creates a new task
func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
func createTask(newTask *models.NewTask, tx dal.Transaction) (*models.Task, errors.Error) {
task := &models.Task{
Plugin: newTask.Plugin,
Subtasks: newTask.Subtasks,
Expand All @@ -55,7 +54,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
if newTask.IsRerun {
task.Status = models.TASK_RERUN
}
err := db.Create(task)
err := tx.Create(task)
if err != nil {
taskLog.Error(err, "save task failed")
return nil, errors.Internal.Wrap(err, "save task failed")
Expand Down

0 comments on commit 9a9a5e5

Please sign in to comment.