From e6b7918ae8cfbfb330ea996b64dfcb39df7a1849 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 27 Apr 2022 01:45:51 +0200 Subject: [PATCH 1/4] lock pull on git&db actions ... --- services/pull/check.go | 2 ++ services/pull/merge.go | 7 ++++++- services/pull/pull.go | 7 +++++++ services/pull/update.go | 3 +++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/services/pull/check.go b/services/pull/check.go index 29dc88e0f0c1d..5e65b0371ccb0 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -314,6 +314,8 @@ func handle(data ...queue.Data) []queue.Data { } func testPR(id int64) { + pullWorkingPool.CheckIn(fmt.Sprint(id)) + defer pullWorkingPool.CheckOut(fmt.Sprint(id)) ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id)) defer finished() diff --git a/services/pull/merge.go b/services/pull/merge.go index 0c615d93c8516..602eb2236fa68 100644 --- a/services/pull/merge.go +++ b/services/pull/merge.go @@ -33,7 +33,6 @@ import ( // Merge merges pull request to base repository. // Caller should check PR is ready to be merged (review and status checks) -// FIXME: add repoWorkingPull make sure two merges does not happen at same time. func Merge(ctx context.Context, pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, mergeStyle repo_model.MergeStyle, expectedHeadCommitID, message string) (err error) { if err = pr.LoadHeadRepo(); err != nil { log.Error("LoadHeadRepo: %v", err) @@ -43,6 +42,9 @@ func Merge(ctx context.Context, pr *models.PullRequest, doer *user_model.User, b return fmt.Errorf("LoadBaseRepo: %v", err) } + pullWorkingPool.CheckIn(fmt.Sprint(pr.ID)) + defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID)) + prUnit, err := pr.BaseRepo.GetUnit(unit.TypePullRequests) if err != nil { log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err) @@ -722,6 +724,9 @@ func CheckPRReadyToMerge(ctx context.Context, pr *models.PullRequest, skipProtec // MergedManually mark pr as merged manually func MergedManually(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) (err error) { + pullWorkingPool.CheckIn(fmt.Sprint(pr.ID)) + defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID)) + prUnit, err := pr.BaseRepo.GetUnit(unit.TypePullRequests) if err != nil { return diff --git a/services/pull/pull.go b/services/pull/pull.go index f036211871322..1e1be28caf678 100644 --- a/services/pull/pull.go +++ b/services/pull/pull.go @@ -25,9 +25,13 @@ import ( "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/sync" issue_service "code.gitea.io/gitea/services/issue" ) +// TODO: use clustered lock (unique queue?) +var pullWorkingPool = sync.NewExclusivePool() + // NewPullRequest creates new pull request with labels for repository. func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *models.Issue, labelIDs []int64, uuids []string, pr *models.PullRequest, assigneeIDs []int64) error { if err := TestPatch(pr); err != nil { @@ -124,6 +128,9 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *mode // ChangeTargetBranch changes the target branch of this pull request, as the given user. func ChangeTargetBranch(ctx context.Context, pr *models.PullRequest, doer *user_model.User, targetBranch string) (err error) { + pullWorkingPool.CheckIn(fmt.Sprint(pr.ID)) + defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID)) + // Current target branch is already the same if pr.BaseBranch == targetBranch { return nil diff --git a/services/pull/update.go b/services/pull/update.go index 2ad58ecd29ac8..31015ce6f548f 100644 --- a/services/pull/update.go +++ b/services/pull/update.go @@ -23,6 +23,9 @@ func Update(ctx context.Context, pull *models.PullRequest, doer *user_model.User style repo_model.MergeStyle ) + pullWorkingPool.CheckIn(fmt.Sprint(pull.ID)) + defer pullWorkingPool.CheckOut(fmt.Sprint(pull.ID)) + if rebase { pr = pull style = repo_model.MergeStyleRebaseUpdate From 29319c46cedfa79e353ae59fd136f778f29a0f90 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 27 Apr 2022 01:51:03 +0200 Subject: [PATCH 2/4] add TODO notes --- services/pull/pull.go | 2 +- services/repository/transfer.go | 1 + services/wiki/wiki.go | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/services/pull/pull.go b/services/pull/pull.go index 1e1be28caf678..3b645e9de17ab 100644 --- a/services/pull/pull.go +++ b/services/pull/pull.go @@ -29,7 +29,7 @@ import ( issue_service "code.gitea.io/gitea/services/issue" ) -// TODO: use clustered lock (unique queue?) +// TODO: use clustered lock (unique queue? or *abuse* cache) var pullWorkingPool = sync.NewExclusivePool() // NewPullRequest creates new pull request with labels for repository. diff --git a/services/repository/transfer.go b/services/repository/transfer.go index 0abb03a88d930..3feeb68f223de 100644 --- a/services/repository/transfer.go +++ b/services/repository/transfer.go @@ -19,6 +19,7 @@ import ( ) // repoWorkingPool represents a working pool to order the parallel changes to the same repository +// TODO: use clustered lock (unique queue? or *abuse* cache) var repoWorkingPool = sync.NewExclusivePool() // TransferOwnership transfers all corresponding setting from old user to new one. diff --git a/services/wiki/wiki.go b/services/wiki/wiki.go index 454f54983c107..796291fd38098 100644 --- a/services/wiki/wiki.go +++ b/services/wiki/wiki.go @@ -27,7 +27,8 @@ import ( var ( reservedWikiNames = []string{"_pages", "_new", "_edit", "raw"} - wikiWorkingPool = sync.NewExclusivePool() + // TODO: use clustered lock (unique queue? or *abuse* cache) + wikiWorkingPool = sync.NewExclusivePool() ) func nameAllowed(name string) error { From 446b79ec01103abf16b33fdc176b28eddd3d4705 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 27 Apr 2022 01:09:42 +0200 Subject: [PATCH 3/4] rename prQueue 2 prPatchCheckerQueue --- services/pull/check.go | 16 ++++++++-------- services/pull/check_test.go | 10 +++++----- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/services/pull/check.go b/services/pull/check.go index 5e65b0371ccb0..fd7caf6e2532a 100644 --- a/services/pull/check.go +++ b/services/pull/check.go @@ -28,8 +28,8 @@ import ( asymkey_service "code.gitea.io/gitea/services/asymkey" ) -// prQueue represents a queue to handle update pull request tests -var prQueue queue.UniqueQueue +// prPatchCheckerQueue represents a queue to handle update pull request tests +var prPatchCheckerQueue queue.UniqueQueue var ( ErrIsClosed = errors.New("pull is cosed") @@ -43,7 +43,7 @@ var ( // AddToTaskQueue adds itself to pull request test task queue. func AddToTaskQueue(pr *models.PullRequest) { - err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error { + err := prPatchCheckerQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error { pr.Status = models.PullRequestStatusChecking err := pr.UpdateColsIfNotMerged("status") if err != nil { @@ -144,7 +144,7 @@ func checkAndUpdateStatus(pr *models.PullRequest) { } // Make sure there is no waiting test to process before leaving the checking status. - has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10)) + has, err := prPatchCheckerQueue.Has(strconv.FormatInt(pr.ID, 10)) if err != nil { log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err) } @@ -293,7 +293,7 @@ func InitializePullRequests(ctx context.Context) { case <-ctx.Done(): return default: - if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error { + if err := prPatchCheckerQueue.PushFunc(strconv.FormatInt(prID, 10), func() error { log.Trace("Adding PR ID: %d to the pull requests patch checking queue", prID) return nil }); err != nil { @@ -360,13 +360,13 @@ func CheckPrsForBaseBranch(baseRepo *repo_model.Repository, baseBranchName strin // Init runs the task queue to test all the checking status pull requests func Init() error { - prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "") + prPatchCheckerQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "") - if prQueue == nil { + if prPatchCheckerQueue == nil { return fmt.Errorf("Unable to create pr_patch_checker Queue") } - go graceful.GetManager().RunWithShutdownFns(prQueue.Run) + go graceful.GetManager().RunWithShutdownFns(prPatchCheckerQueue.Run) go graceful.GetManager().RunWithShutdownContext(InitializePullRequests) return nil } diff --git a/services/pull/check_test.go b/services/pull/check_test.go index 65bcb9c0e44db..bc4c45ffada1c 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -41,7 +41,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { queueShutdown := []func(){} queueTerminate := []func(){} - prQueue = q.(queue.UniqueQueue) + prPatchCheckerQueue = q.(queue.UniqueQueue) pr := unittest.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 2}).(*models.PullRequest) AddToTaskQueue(pr) @@ -51,11 +51,11 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { return pr.Status == models.PullRequestStatusChecking }, 1*time.Second, 100*time.Millisecond) - has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10)) + has, err := prPatchCheckerQueue.Has(strconv.FormatInt(pr.ID, 10)) assert.True(t, has) assert.NoError(t, err) - prQueue.Run(func(shutdown func()) { + prPatchCheckerQueue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) @@ -68,7 +68,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { assert.Fail(t, "Timeout: nothing was added to pullRequestQueue") } - has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10)) + has, err = prPatchCheckerQueue.Has(strconv.FormatInt(pr.ID, 10)) assert.False(t, has) assert.NoError(t, err) @@ -82,5 +82,5 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { callback() } - prQueue = nil + prPatchCheckerQueue = nil } From 1d07b56ee77b9f839d1d22f62cef8ba4be21138b Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Tue, 3 May 2022 23:45:17 +0200 Subject: [PATCH 4/4] fmt --- services/pull/merge.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/pull/merge.go b/services/pull/merge.go index 80970b41ce9a9..fe295cbe03ac2 100644 --- a/services/pull/merge.go +++ b/services/pull/merge.go @@ -35,7 +35,7 @@ import ( // Merge merges pull request to base repository. // Caller should check PR is ready to be merged (review and status checks) func Merge(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, mergeStyle repo_model.MergeStyle, expectedHeadCommitID, message string) error { - if err := pr.LoadHeadRepo(); err != nil { + if err := pr.LoadHeadRepo(); err != nil { log.Error("LoadHeadRepo: %v", err) return fmt.Errorf("LoadHeadRepo: %v", err) } else if err := pr.LoadBaseRepo(); err != nil {