From c7cd0ad0476d746ef640336bc03bcae20178690b Mon Sep 17 00:00:00 2001 From: Aayush Gupta <43479002+Aayyush@users.noreply.github.com> Date: Thu, 16 Sep 2021 10:06:11 -0700 Subject: [PATCH] Log streaming resource cleanup (#102) --- .../events/events_controller_e2e_test.go | 11 +- server/events/mocks/mock_pull_cleaner.go | 12 +- server/events/models/fixtures/fixtures.go | 16 +- server/events/models/models.go | 6 +- server/events/pull_closed_executor.go | 32 +++- server/events/pull_closed_executor_test.go | 124 ++++++++++++++- server/events/yaml/valid/repo_cfg.go | 5 + server/feature/allocator.go | 2 +- .../handlers/mocks/matchers/chan_of_string.go | 15 +- .../mocks/matchers/models_commandname.go | 15 +- .../mocks/matchers/models_commitstatus.go | 15 +- .../matchers/models_projectcommandcontext.go | 15 +- .../mock_project_command_output_handler.go | 145 +++++++++++------- .../project_command_output_handler.go | 25 ++- server/server.go | 35 +++-- 15 files changed, 369 insertions(+), 104 deletions(-) diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index 604a8970d..4cd3cb061 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -912,11 +912,12 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl TestingMode: true, CommandRunner: commandRunner, PullCleaner: &events.PullClosedExecutor{ - Locker: lockingClient, - VCSClient: e2eVCSClient, - WorkingDir: workingDir, - DB: boltdb, - PullClosedTemplate: &events.PullClosedEventTemplate{}, + Locker: lockingClient, + VCSClient: e2eVCSClient, + WorkingDir: workingDir, + DB: boltdb, + PullClosedTemplate: &events.PullClosedEventTemplate{}, + LogStreamResourceCleaner: projectCmdOutputHandler, }, Logger: logger, Parser: eventParser, diff --git a/server/events/mocks/mock_pull_cleaner.go b/server/events/mocks/mock_pull_cleaner.go index 2288c67d7..1691f8738 100644 --- a/server/events/mocks/mock_pull_cleaner.go +++ b/server/events/mocks/mock_pull_cleaner.go @@ -25,11 +25,11 @@ func NewMockPullCleaner(options ...pegomock.Option) *MockPullCleaner { func (mock *MockPullCleaner) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockPullCleaner) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockPullCleaner) CleanUpPull(repo models.Repo, pull models.PullRequest) error { +func (mock *MockPullCleaner) CleanUpPull(_param0 models.Repo, _param1 models.PullRequest) error { if mock == nil { panic("mock must not be nil. Use myMock := NewMockPullCleaner().") } - params := []pegomock.Param{repo, pull} + params := []pegomock.Param{_param0, _param1} result := pegomock.GetGenericMockFrom(mock).Invoke("CleanUpPull", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) var ret0 error if len(result) != 0 { @@ -77,8 +77,8 @@ type VerifierMockPullCleaner struct { timeout time.Duration } -func (verifier *VerifierMockPullCleaner) CleanUpPull(repo models.Repo, pull models.PullRequest) *MockPullCleaner_CleanUpPull_OngoingVerification { - params := []pegomock.Param{repo, pull} +func (verifier *VerifierMockPullCleaner) CleanUpPull(_param0 models.Repo, _param1 models.PullRequest) *MockPullCleaner_CleanUpPull_OngoingVerification { + params := []pegomock.Param{_param0, _param1} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUpPull", params, verifier.timeout) return &MockPullCleaner_CleanUpPull_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -89,8 +89,8 @@ type MockPullCleaner_CleanUpPull_OngoingVerification struct { } func (c *MockPullCleaner_CleanUpPull_OngoingVerification) GetCapturedArguments() (models.Repo, models.PullRequest) { - repo, pull := c.GetAllCapturedArguments() - return repo[len(repo)-1], pull[len(pull)-1] + _param0, _param1 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1] } func (c *MockPullCleaner_CleanUpPull_OngoingVerification) GetAllCapturedArguments() (_param0 []models.Repo, _param1 []models.PullRequest) { diff --git a/server/events/models/fixtures/fixtures.go b/server/events/models/fixtures/fixtures.go index 8aec38b4f..ff5e732fb 100644 --- a/server/events/models/fixtures/fixtures.go +++ b/server/events/models/fixtures/fixtures.go @@ -13,7 +13,12 @@ package fixtures -import "github.com/runatlantis/atlantis/server/events/models" +import ( + "fmt" + + "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/events/yaml/valid" +) var Pull = models.PullRequest{ Num: 1, @@ -21,6 +26,7 @@ var Pull = models.PullRequest{ HeadBranch: "branch", Author: "lkysow", URL: "url", + BaseRepo: GithubRepo, } var GithubRepo = models.Repo{ @@ -50,3 +56,11 @@ var GitlabRepo = models.Repo{ var User = models.User{ Username: "lkysow", } + +var projectName = "test-project" + +var Project = valid.Project{ + Name: &projectName, +} + +var PullInfo = fmt.Sprintf("%s/%d/%s", GithubRepo.FullName, Pull.Num, *Project.Name) diff --git a/server/events/models/models.go b/server/events/models/models.go index 81242a6fd..40ad4fdb1 100644 --- a/server/events/models/models.go +++ b/server/events/models/models.go @@ -447,7 +447,11 @@ func (p ProjectCommandContext) GetShowResultFileName() string { // Gets a unique identifier for the current pull request as a single string func (p ProjectCommandContext) PullInfo() string { - return fmt.Sprintf("%s/%d/%s", p.BaseRepo.FullName, p.Pull.Num, p.ProjectName) + return BuildPullInfo(p.BaseRepo.FullName, p.Pull.Num, p.ProjectName) +} + +func BuildPullInfo(repoName string, pullNum int, projectName string) string { + return fmt.Sprintf("%s/%d/%s", repoName, pullNum, projectName) } // SplitRepoFullName splits a repo full name up into its owner and repo diff --git a/server/events/pull_closed_executor.go b/server/events/pull_closed_executor.go index c9399eb81..3e21f730d 100644 --- a/server/events/pull_closed_executor.go +++ b/server/events/pull_closed_executor.go @@ -29,6 +29,7 @@ import ( "github.com/runatlantis/atlantis/server/events/locking" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/vcs" + "github.com/runatlantis/atlantis/server/handlers" ) //go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_pull_cleaner.go PullCleaner @@ -43,12 +44,13 @@ type PullCleaner interface { // PullClosedExecutor executes the tasks required to clean up a closed pull // request. type PullClosedExecutor struct { - Locker locking.Locker - VCSClient vcs.Client - WorkingDir WorkingDir - Logger logging.SimpleLogging - DB *db.BoltDB - PullClosedTemplate PullCleanupTemplate + Locker locking.Locker + Logger logging.SimpleLogging + DB *db.BoltDB + PullClosedTemplate PullCleanupTemplate + LogStreamResourceCleaner handlers.ResourceCleaner + VCSClient vcs.Client + WorkingDir WorkingDir } type templatedProject struct { @@ -73,6 +75,24 @@ var pullClosedTemplate = template.Must(template.New("").Parse( // CleanUpPull cleans up after a closed pull request. func (p *PullClosedExecutor) CleanUpPull(repo models.Repo, pull models.PullRequest) error { + pullStatus, err := p.DB.GetPullStatus(pull) + if err != nil { + // Log and continue to clean up other resources. + p.Logger.Err("retrieving pull status: %s", err) + } + + if pullStatus != nil { + for _, project := range pullStatus.Projects { + // TODO [ORCA-943]: Set projectName to "/" when project name is not set. + // Upstream atlantis only requires project name to be set if there's more than one project + // with same dir and workspace. If a project name has not been set, we'll use the dir and + // workspace to build project key. + // Source: https://www.runatlantis.io/docs/repo-level-atlantis-yaml.html#reference + projectKey := models.BuildPullInfo(pullStatus.Pull.BaseRepo.FullName, pull.Num, project.ProjectName) + p.LogStreamResourceCleaner.CleanUp(projectKey) + } + } + if err := p.WorkingDir.Delete(repo, pull); err != nil { return errors.Wrap(err, "cleaning workspace") } diff --git a/server/events/pull_closed_executor_test.go b/server/events/pull_closed_executor_test.go index 7321e385a..266557455 100644 --- a/server/events/pull_closed_executor_test.go +++ b/server/events/pull_closed_executor_test.go @@ -14,10 +14,15 @@ package events_test import ( - "errors" + "io/ioutil" "testing" + "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" + "github.com/runatlantis/atlantis/server/events/db" + "github.com/runatlantis/atlantis/server/handlers" + "github.com/stretchr/testify/assert" . "github.com/petergtz/pegomock" "github.com/runatlantis/atlantis/server/events" @@ -27,6 +32,8 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/models/fixtures" vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks" + handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" + loggermocks "github.com/runatlantis/atlantis/server/logging/mocks" . "github.com/runatlantis/atlantis/testing" ) @@ -34,11 +41,16 @@ func TestCleanUpPullWorkspaceErr(t *testing.T) { t.Log("when workspace.Delete returns an error, we return it") RegisterMockTestingT(t) w := mocks.NewMockWorkingDir() + tmp, cleanup := TempDir(t) + defer cleanup() + db, err := db.New(tmp) + Ok(t, err) pce := events.PullClosedExecutor{ WorkingDir: w, PullClosedTemplate: &events.PullClosedEventTemplate{}, + DB: db, } - err := errors.New("err") + err = errors.New("err") When(w.Delete(fixtures.GithubRepo, fixtures.Pull)).ThenReturn(err) actualErr := pce.CleanUpPull(fixtures.GithubRepo, fixtures.Pull) Equals(t, "cleaning workspace: err", actualErr.Error()) @@ -49,12 +61,17 @@ func TestCleanUpPullUnlockErr(t *testing.T) { RegisterMockTestingT(t) w := mocks.NewMockWorkingDir() l := lockmocks.NewMockLocker() + tmp, cleanup := TempDir(t) + defer cleanup() + db, err := db.New(tmp) + Ok(t, err) pce := events.PullClosedExecutor{ Locker: l, WorkingDir: w, + DB: db, PullClosedTemplate: &events.PullClosedEventTemplate{}, } - err := errors.New("err") + err = errors.New("err") When(l.UnlockByPull(fixtures.GithubRepo.FullName, fixtures.Pull.Num)).ThenReturn(nil, err) actualErr := pce.CleanUpPull(fixtures.GithubRepo, fixtures.Pull) Equals(t, "cleaning up locks: err", actualErr.Error()) @@ -72,7 +89,6 @@ func TestCleanUpPullNoLocks(t *testing.T) { Ok(t, err) pce := events.PullClosedExecutor{ Locker: l, - VCSClient: cp, WorkingDir: w, DB: db, PullClosedTemplate: &events.PullClosedEventTemplate{}, @@ -150,18 +166,18 @@ func TestCleanUpPullComments(t *testing.T) { } for _, c := range cases { func() { - w := mocks.NewMockWorkingDir() cp := vcsmocks.NewMockClient() l := lockmocks.NewMockLocker() + w := mocks.NewMockWorkingDir() tmp, cleanup := TempDir(t) defer cleanup() db, err := db.New(tmp) Ok(t, err) pce := events.PullClosedExecutor{ Locker: l, + DB: db, VCSClient: cp, WorkingDir: w, - DB: db, PullClosedTemplate: &events.PullClosedEventTemplate{}, } t.Log("testing: " + c.Description) @@ -175,3 +191,99 @@ func TestCleanUpPullComments(t *testing.T) { }() } } + +func TestCleanUpLogStreaming(t *testing.T) { + RegisterMockTestingT(t) + + t.Run("Should Clean Up Log Streaming Resources When PR is closed", func(t *testing.T) { + prjStatusUpdater := handlermocks.NewMockProjectStatusUpdater() + prjJobURLGenerator := handlermocks.NewMockProjectJobURLGenerator() + + // Create Log streaming resources + prjCmdOutput := make(chan *models.ProjectCmdOutputLine) + prjCmdOutHandler := handlers.NewAsyncProjectCommandOutputHandler(prjCmdOutput, prjStatusUpdater, prjJobURLGenerator, logger) + ctx := models.ProjectCommandContext{ + BaseRepo: fixtures.GithubRepo, + Pull: fixtures.Pull, + ProjectName: *fixtures.Project.Name, + } + + go prjCmdOutHandler.Handle() + prjCmdOutHandler.Send(ctx, "Test Message") + + // Create boltdb and add pull request. + var lockBucket = "bucket" + var configBucket = "configBucket" + var pullsBucketName = "pulls" + + f, err := ioutil.TempFile("", "") + if err != nil { + panic(errors.Wrap(err, "failed to create temp file")) + } + path := f.Name() + f.Close() // nolint: errcheck + + // Open the database. + boltDB, err := bolt.Open(path, 0600, nil) + if err != nil { + panic(errors.Wrap(err, "could not start bolt DB")) + } + if err := boltDB.Update(func(tx *bolt.Tx) error { + if _, err := tx.CreateBucketIfNotExists([]byte(pullsBucketName)); err != nil { + return errors.Wrap(err, "failed to create bucket") + } + return nil + }); err != nil { + panic(errors.Wrap(err, "could not create bucket")) + } + db, _ := db.NewWithDB(boltDB, lockBucket, configBucket) + result := []models.ProjectResult{ + { + RepoRelDir: fixtures.GithubRepo.FullName, + Workspace: "default", + ProjectName: *fixtures.Project.Name, + }, + } + + // Create a new record for pull + _, err = db.UpdatePullWithResults(fixtures.Pull, result) + Ok(t, err) + + workingDir := mocks.NewMockWorkingDir() + locker := lockmocks.NewMockLocker() + client := vcsmocks.NewMockClient() + logger := loggermocks.NewMockSimpleLogging() + + pullClosedExecutor := events.PullClosedExecutor{ + Locker: locker, + WorkingDir: workingDir, + DB: db, + VCSClient: client, + PullClosedTemplate: &events.PullClosedEventTemplate{}, + LogStreamResourceCleaner: prjCmdOutHandler, + Logger: logger, + } + + locks := []models.ProjectLock{ + { + Project: models.NewProject(fixtures.GithubRepo.FullName, ""), + Workspace: "default", + }, + } + When(locker.UnlockByPull(fixtures.GithubRepo.FullName, fixtures.Pull.Num)).ThenReturn(locks, nil) + + // Clean up. + err = pullClosedExecutor.CleanUpPull(fixtures.GithubRepo, fixtures.Pull) + Ok(t, err) + + close(prjCmdOutput) + _, _, comment, _ := client.VerifyWasCalledOnce().CreateComment(matchers.AnyModelsRepo(), AnyInt(), AnyString(), AnyString()).GetCapturedArguments() + expectedComment := "Locks and plans deleted for the projects and workspaces modified in this pull request:\n\n" + "- dir: `.` workspace: `default`" + Equals(t, expectedComment, comment) + + // Assert log streaming resources are cleaned up. + dfPrjCmdOutputHandler := prjCmdOutHandler.(*handlers.AsyncProjectCommandOutputHandler) + assert.Empty(t, dfPrjCmdOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) + assert.Empty(t, dfPrjCmdOutputHandler.GetReceiverBufferForPull(ctx.PullInfo())) + }) +} diff --git a/server/events/yaml/valid/repo_cfg.go b/server/events/yaml/valid/repo_cfg.go index 02f7188d1..ac109859a 100644 --- a/server/events/yaml/valid/repo_cfg.go +++ b/server/events/yaml/valid/repo_cfg.go @@ -116,6 +116,11 @@ func (p Project) GetName() string { if p.Name != nil { return *p.Name } + // TODO + // Upstream atlantis only requires project name to be set if there's more than one project + // with same dir and workspace. If a project name has not been set, we'll use the dir and + // workspace to build project key. + // Source: https://www.runatlantis.io/docs/repo-level-atlantis-yaml.html#reference return "" } diff --git a/server/feature/allocator.go b/server/feature/allocator.go index 3bb4ba865..3b96b42af 100644 --- a/server/feature/allocator.go +++ b/server/feature/allocator.go @@ -6,7 +6,7 @@ import ( "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/events/vcs" "github.com/runatlantis/atlantis/server/logging" - "github.com/thomaspoignant/go-feature-flag" + ffclient "github.com/thomaspoignant/go-feature-flag" "github.com/thomaspoignant/go-feature-flag/ffuser" ) diff --git a/server/handlers/mocks/matchers/chan_of_string.go b/server/handlers/mocks/matchers/chan_of_string.go index 512e2a598..e1bfee572 100644 --- a/server/handlers/mocks/matchers/chan_of_string.go +++ b/server/handlers/mocks/matchers/chan_of_string.go @@ -2,9 +2,8 @@ package matchers import ( - "reflect" "github.com/petergtz/pegomock" - + "reflect" ) func AnyChanOfString() chan string { @@ -18,3 +17,15 @@ func EqChanOfString(value chan string) chan string { var nullValue chan string return nullValue } + +func NotEqChanOfString(value chan string) chan string { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue chan string + return nullValue +} + +func ChanOfStringThat(matcher pegomock.ArgumentMatcher) chan string { + pegomock.RegisterMatcher(matcher) + var nullValue chan string + return nullValue +} diff --git a/server/handlers/mocks/matchers/models_commandname.go b/server/handlers/mocks/matchers/models_commandname.go index 6eba95f8e..f586b4d21 100644 --- a/server/handlers/mocks/matchers/models_commandname.go +++ b/server/handlers/mocks/matchers/models_commandname.go @@ -2,8 +2,9 @@ package matchers import ( - "reflect" "github.com/petergtz/pegomock" + "reflect" + models "github.com/runatlantis/atlantis/server/events/models" ) @@ -18,3 +19,15 @@ func EqModelsCommandName(value models.CommandName) models.CommandName { var nullValue models.CommandName return nullValue } + +func NotEqModelsCommandName(value models.CommandName) models.CommandName { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue models.CommandName + return nullValue +} + +func ModelsCommandNameThat(matcher pegomock.ArgumentMatcher) models.CommandName { + pegomock.RegisterMatcher(matcher) + var nullValue models.CommandName + return nullValue +} diff --git a/server/handlers/mocks/matchers/models_commitstatus.go b/server/handlers/mocks/matchers/models_commitstatus.go index 0f579baf8..1e10ed782 100644 --- a/server/handlers/mocks/matchers/models_commitstatus.go +++ b/server/handlers/mocks/matchers/models_commitstatus.go @@ -2,8 +2,9 @@ package matchers import ( - "reflect" "github.com/petergtz/pegomock" + "reflect" + models "github.com/runatlantis/atlantis/server/events/models" ) @@ -18,3 +19,15 @@ func EqModelsCommitStatus(value models.CommitStatus) models.CommitStatus { var nullValue models.CommitStatus return nullValue } + +func NotEqModelsCommitStatus(value models.CommitStatus) models.CommitStatus { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue models.CommitStatus + return nullValue +} + +func ModelsCommitStatusThat(matcher pegomock.ArgumentMatcher) models.CommitStatus { + pegomock.RegisterMatcher(matcher) + var nullValue models.CommitStatus + return nullValue +} diff --git a/server/handlers/mocks/matchers/models_projectcommandcontext.go b/server/handlers/mocks/matchers/models_projectcommandcontext.go index 1b68eb9e3..535f8b967 100644 --- a/server/handlers/mocks/matchers/models_projectcommandcontext.go +++ b/server/handlers/mocks/matchers/models_projectcommandcontext.go @@ -2,8 +2,9 @@ package matchers import ( - "reflect" "github.com/petergtz/pegomock" + "reflect" + models "github.com/runatlantis/atlantis/server/events/models" ) @@ -18,3 +19,15 @@ func EqModelsProjectCommandContext(value models.ProjectCommandContext) models.Pr var nullValue models.ProjectCommandContext return nullValue } + +func NotEqModelsProjectCommandContext(value models.ProjectCommandContext) models.ProjectCommandContext { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue models.ProjectCommandContext + return nullValue +} + +func ModelsProjectCommandContextThat(matcher pegomock.ArgumentMatcher) models.ProjectCommandContext { + pegomock.RegisterMatcher(matcher) + var nullValue models.ProjectCommandContext + return nullValue +} diff --git a/server/handlers/mocks/mock_project_command_output_handler.go b/server/handlers/mocks/mock_project_command_output_handler.go index faf12dfcf..462a59cde 100644 --- a/server/handlers/mocks/mock_project_command_output_handler.go +++ b/server/handlers/mocks/mock_project_command_output_handler.go @@ -25,27 +25,35 @@ func NewMockProjectCommandOutputHandler(options ...pegomock.Option) *MockProject func (mock *MockProjectCommandOutputHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockProjectCommandOutputHandler) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) { +func (mock *MockProjectCommandOutputHandler) CleanUp(_param0 string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{ctx} + params := []pegomock.Param{_param0} + pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) +} + +func (mock *MockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") + } + params := []pegomock.Param{_param0} pegomock.GetGenericMockFrom(mock).Invoke("Clear", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { +func (mock *MockProjectCommandOutputHandler) Handle() { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{ctx, msg} - pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) + params := []pegomock.Param{} + pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Receive(projectInfo string, receiver chan string, callback func(string) error) error { +func (mock *MockProjectCommandOutputHandler) Receive(_param0 string, _param1 chan string, _param2 func(string) error) error { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{projectInfo, receiver, callback} + params := []pegomock.Param{_param0, _param1, _param2} result := pegomock.GetGenericMockFrom(mock).Invoke("Receive", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) var ret0 error if len(result) != 0 { @@ -56,19 +64,19 @@ func (mock *MockProjectCommandOutputHandler) Receive(projectInfo string, receive return ret0 } -func (mock *MockProjectCommandOutputHandler) Handle() { +func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{} - pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) + params := []pegomock.Param{_param0, _param1} + pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { +func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) error { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{ctx, cmdName, status} + params := []pegomock.Param{_param0, _param1, _param2} result := pegomock.GetGenericMockFrom(mock).Invoke("SetJobURLWithStatus", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) var ret0 error if len(result) != 0 { @@ -86,14 +94,14 @@ func (mock *MockProjectCommandOutputHandler) VerifyWasCalledOnce() *VerifierMock } } -func (mock *MockProjectCommandOutputHandler) VerifyWasCalled(invocationCountMatcher pegomock.Matcher) *VerifierMockProjectCommandOutputHandler { +func (mock *MockProjectCommandOutputHandler) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockProjectCommandOutputHandler { return &VerifierMockProjectCommandOutputHandler{ mock: mock, invocationCountMatcher: invocationCountMatcher, } } -func (mock *MockProjectCommandOutputHandler) VerifyWasCalledInOrder(invocationCountMatcher pegomock.Matcher, inOrderContext *pegomock.InOrderContext) *VerifierMockProjectCommandOutputHandler { +func (mock *MockProjectCommandOutputHandler) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockProjectCommandOutputHandler { return &VerifierMockProjectCommandOutputHandler{ mock: mock, invocationCountMatcher: invocationCountMatcher, @@ -101,7 +109,7 @@ func (mock *MockProjectCommandOutputHandler) VerifyWasCalledInOrder(invocationCo } } -func (mock *MockProjectCommandOutputHandler) VerifyWasCalledEventually(invocationCountMatcher pegomock.Matcher, timeout time.Duration) *VerifierMockProjectCommandOutputHandler { +func (mock *MockProjectCommandOutputHandler) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockProjectCommandOutputHandler { return &VerifierMockProjectCommandOutputHandler{ mock: mock, invocationCountMatcher: invocationCountMatcher, @@ -111,71 +119,84 @@ func (mock *MockProjectCommandOutputHandler) VerifyWasCalledEventually(invocatio type VerifierMockProjectCommandOutputHandler struct { mock *MockProjectCommandOutputHandler - invocationCountMatcher pegomock.Matcher + invocationCountMatcher pegomock.InvocationCountMatcher inOrderContext *pegomock.InOrderContext timeout time.Duration } -func (verifier *VerifierMockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { - params := []pegomock.Param{ctx} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) CleanUp(_param0 string) *MockProjectCommandOutputHandler_CleanUp_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) + return &MockProjectCommandOutputHandler_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { +type MockProjectCommandOutputHandler_CleanUp_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { - ctx := c.GetAllCapturedArguments() - return ctx[len(ctx)-1] +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] } -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) + _param0 = make([]string, len(c.methodInvocations)) for u, param := range params[0] { - _param0[u] = param.(models.ProjectCommandContext) + _param0[u] = param.(string) } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) *MockProjectCommandOutputHandler_Send_OngoingVerification { - params := []pegomock.Param{ctx, msg} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Send_OngoingVerification struct { +type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { - ctx, msg := c.GetAllCapturedArguments() - return ctx[len(ctx)-1], msg[len(msg)-1] +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) for u, param := range params[0] { _param0[u] = param.(models.ProjectCommandContext) } - _param1 = make([]string, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(string) - } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Receive(projectInfo string, receiver chan string, callback func(string) error) *MockProjectCommandOutputHandler_Receive_OngoingVerification { - params := []pegomock.Param{projectInfo, receiver, callback} +func (verifier *VerifierMockProjectCommandOutputHandler) Handle() *MockProjectCommandOutputHandler_Handle_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Handle", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Handle_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockProjectCommandOutputHandler_Handle_OngoingVerification struct { + mock *MockProjectCommandOutputHandler + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCapturedArguments() { +} + +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierMockProjectCommandOutputHandler) Receive(_param0 string, _param1 chan string, _param2 func(string) error) *MockProjectCommandOutputHandler_Receive_OngoingVerification { + params := []pegomock.Param{_param0, _param1, _param2} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Receive", params, verifier.timeout) return &MockProjectCommandOutputHandler_Receive_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -186,8 +207,8 @@ type MockProjectCommandOutputHandler_Receive_OngoingVerification struct { } func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetCapturedArguments() (string, chan string, func(string) error) { - projectInfo, receiver, callback := c.GetAllCapturedArguments() - return projectInfo[len(projectInfo)-1], receiver[len(receiver)-1], callback[len(callback)-1] + _param0, _param1, _param2 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] } func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string, _param2 []func(string) error) { @@ -209,25 +230,39 @@ func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetAllCapt return } -func (verifier *VerifierMockProjectCommandOutputHandler) Handle() *MockProjectCommandOutputHandler_Handle_OngoingVerification { - params := []pegomock.Param{} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Handle", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Handle_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) *MockProjectCommandOutputHandler_Send_OngoingVerification { + params := []pegomock.Param{_param0, _param1} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Handle_OngoingVerification struct { +type MockProjectCommandOutputHandler_Send_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCapturedArguments() { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { + _param0, _param1 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1] } -func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(models.ProjectCommandContext) + } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } + } + return } -func (verifier *VerifierMockProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification { - params := []pegomock.Param{ctx, cmdName, status} +func (verifier *VerifierMockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification { + params := []pegomock.Param{_param0, _param1, _param2} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetJobURLWithStatus", params, verifier.timeout) return &MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -238,8 +273,8 @@ type MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification str } func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, models.CommandName, models.CommitStatus) { - ctx, cmdName, status := c.GetAllCapturedArguments() - return ctx[len(ctx)-1], cmdName[len(cmdName)-1], status[len(status)-1] + _param0, _param1, _param2 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] } func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []models.CommandName, _param2 []models.CommitStatus) { diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go index ff417d4a2..b8adce66d 100644 --- a/server/handlers/project_command_output_handler.go +++ b/server/handlers/project_command_output_handler.go @@ -58,6 +58,14 @@ type ProjectCommandOutputHandler interface { // SetJobURLWithStatus sets the commit status for the project represented by // ctx and updates the status with and url to a job. SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error + + ResourceCleaner +} + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_resource_cleaner.go ResourceCleaner + +type ResourceCleaner interface { + CleanUp(pull string) } func NewAsyncProjectCommandOutputHandler( @@ -87,7 +95,7 @@ func (p *AsyncProjectCommandOutputHandler) Receive(projectInfo string, receiver // Avoid deadlock when projectOutputBuffer size is greater than the channel (currently set to 1000) // Running this as a goroutine allows for the channel to be read in callback go p.addChan(receiver, projectInfo) - defer p.cleanUp(projectInfo, receiver) + defer p.removeChan(projectInfo, receiver) for msg := range receiver { if err := callback(msg); err != nil { @@ -169,7 +177,7 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string } //Remove channel, so client no longer receives Terraform output -func (p *AsyncProjectCommandOutputHandler) cleanUp(pull string, ch chan string) { +func (p *AsyncProjectCommandOutputHandler) removeChan(pull string, ch chan string) { p.receiverBuffersLock.Lock() delete(p.receiverBuffers[pull], ch) p.receiverBuffersLock.Unlock() @@ -183,6 +191,19 @@ func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(pull string) [ return p.projectOutputBuffers[pull] } +func (p *AsyncProjectCommandOutputHandler) CleanUp(pull string) { + p.projectOutputBuffersLock.Lock() + delete(p.projectOutputBuffers, pull) + p.projectOutputBuffersLock.Unlock() + + p.receiverBuffersLock.Lock() + for ch := range p.receiverBuffers[pull] { + close(ch) + } + delete(p.receiverBuffers, pull) + p.receiverBuffersLock.Unlock() +} + // FeatureAwareOutputHandler is a decorator that add feature allocator // functionality to the AsyncProjectCommandOutputHandler type FeatureAwareOutputHandler struct { diff --git a/server/server.go b/server/server.go index 517205107..e16d8ed30 100644 --- a/server/server.go +++ b/server/server.go @@ -425,12 +425,13 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { statsScope, logger, &events.PullClosedExecutor{ - VCSClient: vcsClient, - Locker: lockingClient, - WorkingDir: workingDir, - Logger: logger, - DB: boltdb, - PullClosedTemplate: &events.PullClosedEventTemplate{}, + Locker: lockingClient, + WorkingDir: workingDir, + Logger: logger, + DB: boltdb, + PullClosedTemplate: &events.PullClosedEventTemplate{}, + LogStreamResourceCleaner: projectCmdOutputHandler, + VCSClient: vcsClient, }, ) eventParser := &events.EventParser{ @@ -719,11 +720,12 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { statsScope, logger, &events.PullClosedExecutor{ - VCSClient: vcsClient, - Locker: lockingClient, - WorkingDir: workingDir, - Logger: logger, - DB: boltdb, + VCSClient: vcsClient, + Locker: lockingClient, + WorkingDir: workingDir, + Logger: logger, + DB: boltdb, + LogStreamResourceCleaner: projectCmdOutputHandler, // using a specific template to signal that this is from an async process PullClosedTemplate: NewGCStaleClosedPull(), @@ -731,11 +733,12 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { // using a pullclosed executor for stale open PRs. Naming is weird, we need to come up with something better. &events.PullClosedExecutor{ - VCSClient: vcsClient, - Locker: lockingClient, - WorkingDir: workingDir, - Logger: logger, - DB: boltdb, + VCSClient: vcsClient, + Locker: lockingClient, + WorkingDir: workingDir, + Logger: logger, + DB: boltdb, + LogStreamResourceCleaner: projectCmdOutputHandler, // using a specific template to signal that this is from an async process PullClosedTemplate: NewGCStaleOpenPull(),