diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index b83b62938..432709fee 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -36,9 +36,9 @@ import ( "github.com/runatlantis/atlantis/server/events/webhooks" "github.com/runatlantis/atlantis/server/events/yaml" "github.com/runatlantis/atlantis/server/events/yaml/valid" - "github.com/runatlantis/atlantis/server/feature" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/controllers/logstreaming_controller.go b/server/controllers/jobs_controller.go similarity index 76% rename from server/controllers/logstreaming_controller.go rename to server/controllers/jobs_controller.go index 6bfd10e0a..9134620a4 100644 --- a/server/controllers/logstreaming_controller.go +++ b/server/controllers/jobs_controller.go @@ -8,13 +8,13 @@ import ( "strconv" "github.com/gorilla/mux" - "github.com/gorilla/websocket" stats "github.com/lyft/gostats" + "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/controllers/templates" + "github.com/runatlantis/atlantis/server/controllers/websocket" "github.com/runatlantis/atlantis/server/core/db" "github.com/runatlantis/atlantis/server/events/metrics" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" ) @@ -25,10 +25,20 @@ type JobsController struct { ProjectJobsTemplate templates.TemplateWriter ProjectJobsErrorTemplate templates.TemplateWriter Db *db.BoltDB + WsMux *websocket.Multiplexor + StatsScope stats.Scope +} + +type ProjectInfoKeyGenerator struct{} + +func (g ProjectInfoKeyGenerator) Generate(r *http.Request) (string, error) { + projectInfo, err := newProjectInfo(r) + + if err != nil { + return "", errors.Wrap(err, "creating project info") + } - WebsocketHandler handlers.WebsocketHandler - ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler - StatsScope stats.Scope + return projectInfo.String(), nil } type pullInfo struct { @@ -132,37 +142,9 @@ func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request) } func (j *JobsController) getProjectJobsWS(w http.ResponseWriter, r *http.Request) error { - projectInfo, err := newProjectInfo(r) - if err != nil { - j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) - return err - } - - c, err := j.WebsocketHandler.Upgrade(w, r, nil) - if err != nil { - j.Logger.Warn("Failed to upgrade websocket: %s", err) - return err - } - - // Buffer size set to 1000 to ensure messages get queued (upto 1000) if the receiverCh is not ready to - // receive messages before the channel is closed and resources cleaned up. - receiver := make(chan string, 1000) - j.WebsocketHandler.SetCloseHandler(c, receiver) - - // Add a reader goroutine to listen for socket.close() events. - go j.WebsocketHandler.SetReadHandler(c) - - pull := projectInfo.String() - err = j.ProjectCommandOutputHandler.Receive(pull, receiver, func(msg string) error { - if err := c.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil { - j.Logger.Warn("Failed to write ws message: %s", err) - return err - } - return nil - }) + err := j.WsMux.Handle(w, r) if err != nil { - j.Logger.Warn("Failed to receive message: %s", err) j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) return err } diff --git a/server/controllers/logstreaming_controller_test.go b/server/controllers/logstreaming_controller_test.go deleted file mode 100644 index c8f30e555..000000000 --- a/server/controllers/logstreaming_controller_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package controllers_test - -import ( - "net/http" - "net/http/httptest" - "testing" - - "github.com/gorilla/mux" - "github.com/runatlantis/atlantis/server/controllers" - "github.com/runatlantis/atlantis/server/logging" - - . "github.com/petergtz/pegomock" - "github.com/runatlantis/atlantis/server/handlers/mocks" - "github.com/runatlantis/atlantis/server/handlers/mocks/matchers" -) - -func TestGetProjectJobs_WebSockets(t *testing.T) { - t.Run("Should Group by Project Info", func(t *testing.T) { - RegisterMockTestingT(t) - websocketMock := mocks.NewMockWebsocketHandler() - projectOutputHandler := mocks.NewMockProjectCommandOutputHandler() - logger := logging.NewNoopLogger(t) - webSocketWrapper := mocks.NewMockWebsocketConnectionWrapper() - params := map[string]string{ - "org": "test-org", - "repo": "test-repo", - "pull": "1", - "project": "test-project", - } - request, _ := http.NewRequest(http.MethodGet, "/logStreaming/org/repo/1/project/ws", nil) - request = mux.SetURLVars(request, params) - response := httptest.NewRecorder() - jobsController := &controllers.JobsController{ - Logger: logger, - WebsocketHandler: websocketMock, - ProjectCommandOutputHandler: projectOutputHandler, - } - - When(websocketMock.Upgrade(matchers.AnyHttpResponseWriter(), matchers.AnyPtrToHttpRequest(), matchers.AnyHttpHeader())).ThenReturn(webSocketWrapper, nil) - - jobsController.GetProjectJobsWS(response, request) - - webSocketWrapper.VerifyWasCalled(Once()) - }) -} diff --git a/server/controllers/websocket/mux.go b/server/controllers/websocket/mux.go new file mode 100644 index 000000000..ccfbdf99f --- /dev/null +++ b/server/controllers/websocket/mux.go @@ -0,0 +1,63 @@ +package websocket + +import ( + "net/http" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/logging" +) + +// PartitionKeyGenerator generates partition keys for the multiplexor +type PartitionKeyGenerator interface { + Generate(r *http.Request) (string, error) +} + +// PartitionRegistry is the registry holding each partition +// and is responsible for registering/deregistering new buffers +type PartitionRegistry interface { + Register(key string, buffer chan string) + Deregister(key string, buffer chan string) +} + +// Multiplexor is responsible for handling the data transfer between the storage layer +// and the registry. Note this is still a WIP as right now the registry is assumed to handle +// everything. +type Multiplexor struct { + writer *Writer + keyGenerator PartitionKeyGenerator + registry PartitionRegistry +} + +func NewMultiplexor(log logging.SimpleLogging, keyGenerator PartitionKeyGenerator, registry PartitionRegistry) *Multiplexor { + upgrader := websocket.Upgrader{} + upgrader.CheckOrigin = func(r *http.Request) bool { return true } + return &Multiplexor{ + writer: &Writer{ + upgrader: upgrader, + log: log, + }, + keyGenerator: keyGenerator, + registry: registry, + } +} + +// Handle should be called for a given websocket request. It blocks +// while writing to the websocket until the buffer is closed. +func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error { + key, err := m.keyGenerator.Generate(r) + + if err != nil { + return errors.Wrapf(err, "generating partition key") + } + + // Buffer size set to 1000 to ensure messages get queued. + // TODO: make buffer size configurable + buffer := make(chan string, 1000) + + // spinning up a goroutine for this since we are attempting to block on the read side. + go m.registry.Register(key, buffer) + defer m.registry.Deregister(key, buffer) + + return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key) +} diff --git a/server/controllers/websocket/writer.go b/server/controllers/websocket/writer.go new file mode 100644 index 000000000..eca3f6ae6 --- /dev/null +++ b/server/controllers/websocket/writer.go @@ -0,0 +1,70 @@ +package websocket + +import ( + "net/http" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/logging" +) + +func NewWriter(log logging.SimpleLogging) *Writer { + upgrader := websocket.Upgrader{} + upgrader.CheckOrigin = func(r *http.Request) bool { return true } + return &Writer{ + upgrader: upgrader, + log: log, + } +} + +type Writer struct { + upgrader websocket.Upgrader + + //TODO: Remove dependency on atlantis logger here if we upstream this. + log logging.SimpleLogging +} + +func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan string) error { + conn, err := w.upgrader.Upgrade(rw, r, nil) + + if err != nil { + return errors.Wrap(err, "upgrading websocket connection") + } + + conn.SetCloseHandler(func(code int, text string) error { + // Close the channnel after websocket connection closed. + // Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup. + // is it good practice to close at the receiver? Probably not, we should figure out a better + // way to handle this case + close(input) + return nil + }) + + // Add a reader goroutine to listen for socket.close() events. + go w.setReadHandler(conn) + + // block on reading our input channel + for msg := range input { + if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil { + w.log.Warn("Failed to write ws message: %s", err) + return err + } + } + + return nil +} + +func (w *Writer) setReadHandler(c *websocket.Conn) { + for { + _, _, err := c.ReadMessage() + if err != nil { + // CloseGoingAway (1001) when a browser tab is closed. + // Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + w.log.Warn("Failed to read WS message: %s", err) + } + return + } + } + +} diff --git a/server/core/terraform/terraform_client.go b/server/core/terraform/terraform_client.go index 3026b5e7e..2b3f1fe05 100644 --- a/server/core/terraform/terraform_client.go +++ b/server/core/terraform/terraform_client.go @@ -33,9 +33,9 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/terraform/ansi" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" ) var LogStreamingValidCmds = [...]string{"init", "plan", "apply"} diff --git a/server/core/terraform/terraform_client_internal_test.go b/server/core/terraform/terraform_client_internal_test.go index e1c7f602c..abee1db2f 100644 --- a/server/core/terraform/terraform_client_internal_test.go +++ b/server/core/terraform/terraform_client_internal_test.go @@ -10,10 +10,10 @@ import ( version "github.com/hashicorp/go-version" . "github.com/petergtz/pegomock" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/feature" - fmocks "github.com/runatlantis/atlantis/server/feature/mocks" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/core/terraform/terraform_client_test.go b/server/core/terraform/terraform_client_test.go index 2f1abff24..0964abf6b 100644 --- a/server/core/terraform/terraform_client_test.go +++ b/server/core/terraform/terraform_client_test.go @@ -28,10 +28,10 @@ import ( "github.com/runatlantis/atlantis/server/core/terraform" "github.com/runatlantis/atlantis/server/core/terraform/mocks" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/feature" - fmocks "github.com/runatlantis/atlantis/server/feature/mocks" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/events/command_runner.go b/server/events/command_runner.go index cb15e75eb..1a780991d 100644 --- a/server/events/command_runner.go +++ b/server/events/command_runner.go @@ -24,6 +24,7 @@ import ( "github.com/runatlantis/atlantis/server/events/vcs" "github.com/runatlantis/atlantis/server/events/yaml/valid" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" "github.com/runatlantis/atlantis/server/recovery" gitlab "github.com/xanzy/go-gitlab" ) diff --git a/server/events/command_runner_test.go b/server/events/command_runner_test.go index 3246cb89b..a6da9698d 100644 --- a/server/events/command_runner_test.go +++ b/server/events/command_runner_test.go @@ -36,6 +36,8 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/models/fixtures" vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) @@ -46,6 +48,7 @@ var azuredevopsGetter *mocks.MockAzureDevopsPullGetter var githubGetter *mocks.MockGithubPullGetter var gitlabGetter *mocks.MockGitlabMergeRequestGetter var ch events.DefaultCommandRunner +var fa events.FeatureAwareCommandRunner var workingDir events.WorkingDir var pendingPlanFinder *mocks.MockPendingPlanFinder var drainer *events.Drainer @@ -382,6 +385,46 @@ func TestRunCommentCommand_DisableApplyAllDisabled(t *testing.T) { vcsClient.VerifyWasCalledOnce().CreateComment(fixtures.GithubRepo, modelPull.Num, "**Error:** Running `atlantis apply` without flags is disabled. You must specify which project to apply via the `-d `, `-w ` or `-p ` flags.", "apply") } +func TestFeatureAwareRunCommentCommandRunner_CommentWhenEnabled(t *testing.T) { + t.Log("if \"atlantis apply --force\" is run and this is enabled atlantis should" + + " comment with a warning") + vcsClient := setup(t) + allocator := fmocks.NewMockAllocator() + + fa = events.FeatureAwareCommandRunner{ + CommandRunner: &ch, + VCSClient: vcsClient, + Logger: logger, + FeatureAllocator: allocator, + } + + modelPull := models.PullRequest{BaseRepo: fixtures.GithubRepo, State: models.OpenPullState, Num: fixtures.Pull.Num} + When(allocator.ShouldAllocate(feature.ForceApply, "runatlantis/atlantis")).ThenReturn(true, nil) + + fa.RunCommentCommand(fixtures.GithubRepo, nil, nil, fixtures.User, modelPull.Num, &events.CommentCommand{Name: models.ApplyCommand, ForceApply: true}) + vcsClient.VerifyWasCalledOnce().CreateComment(fixtures.GithubRepo, modelPull.Num, "⚠️ WARNING ⚠️\n\n You have bypassed all apply requirements for this PR 🚀 . This can have unpredictable consequences 🙏🏽 and should only be used in an emergency 🆘 .\n\n 𝐓𝐡𝐢𝐬 𝐚𝐜𝐭𝐢𝐨𝐧 𝐰𝐢𝐥𝐥 𝐛𝐞 𝐚𝐮𝐝𝐢𝐭𝐞𝐝.\n", "") +} + +func TestFeatureAwareRunCommentCommandRunner_NoCommentWhenDisabled(t *testing.T) { + t.Log("if \"atlantis apply --force\" is run and this is disabled atlantis should" + + " not comment with a warning") + vcsClient := setup(t) + allocator := fmocks.NewMockAllocator() + + fa = events.FeatureAwareCommandRunner{ + CommandRunner: &ch, + VCSClient: vcsClient, + Logger: logger, + FeatureAllocator: allocator, + } + + modelPull := models.PullRequest{BaseRepo: fixtures.GithubRepo, State: models.OpenPullState, Num: fixtures.Pull.Num} + When(allocator.ShouldAllocate(feature.ForceApply, "runatlantis/atlantis")).ThenReturn(false, nil) + + fa.RunCommentCommand(fixtures.GithubRepo, nil, nil, fixtures.User, modelPull.Num, &events.CommentCommand{Name: models.ApplyCommand, ForceApply: true}) + vcsClient.VerifyWasCalled(Never()).CreateComment(fixtures.GithubRepo, modelPull.Num, "⚠️ WARNING ⚠️\n\n You have bypassed all apply requirements for this PR 🚀 . This can have unpredictable consequences 🙏🏽 and should only be used in an emergency 🆘 .\n\n 𝐓𝐡𝐢𝐬 𝐚𝐜𝐭𝐢𝐨𝐧 𝐰𝐢𝐥𝐥 𝐛𝐞 𝐚𝐮𝐝𝐢𝐭𝐞𝐝.\n", "") +} + func TestRunCommentCommand_DisableDisableAutoplan(t *testing.T) { t.Log("if \"DisableAutoplan is true\" are disabled and we are silencing return and do not comment with error") setup(t) diff --git a/server/events/project_command_runner.go b/server/events/project_command_runner.go index 4db7ff966..0e20ed085 100644 --- a/server/events/project_command_runner.go +++ b/server/events/project_command_runner.go @@ -26,6 +26,7 @@ import ( "github.com/runatlantis/atlantis/server/events/yaml/valid" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" ) // DirNotExistErr is an error caused by the directory not existing. diff --git a/server/events/project_command_runner_test.go b/server/events/project_command_runner_test.go index ceb765212..a414a13de 100644 --- a/server/events/project_command_runner_test.go +++ b/server/events/project_command_runner_test.go @@ -31,6 +31,8 @@ import ( "github.com/runatlantis/atlantis/server/events/yaml/valid" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/handlers/instrumented_project_command_output_handler.go b/server/handlers/instrumented_project_command_output_handler.go index 1d4e646b2..3abbc1df7 100644 --- a/server/handlers/instrumented_project_command_output_handler.go +++ b/server/handlers/instrumented_project_command_output_handler.go @@ -4,6 +4,7 @@ import ( "fmt" stats "github.com/lyft/gostats" + "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/logging" ) @@ -13,15 +14,25 @@ type InstrumentedProjectCommandOutputHandler struct { logger logging.SimpleLogging } -func NewInstrumentedProjectCommandOutputHandler(prjCmdOutputHandler ProjectCommandOutputHandler, statsScope stats.Scope, logger logging.SimpleLogging) ProjectCommandOutputHandler { +func NewInstrumentedProjectCommandOutputHandler(projectCmdOutput chan *models.ProjectCmdOutputLine, + projectStatusUpdater ProjectStatusUpdater, + projectJobURLGenerator ProjectJobURLGenerator, + logger logging.SimpleLogging, + scope stats.Scope) ProjectCommandOutputHandler { + prjCmdOutputHandler := NewAsyncProjectCommandOutputHandler( + projectCmdOutput, + projectStatusUpdater, + projectJobURLGenerator, + logger, + ) return &InstrumentedProjectCommandOutputHandler{ ProjectCommandOutputHandler: prjCmdOutputHandler, - numWSConnnections: statsScope.Scope("getprojectjobs").Scope("websocket").NewGauge("connections"), + numWSConnnections: scope.Scope("getprojectjobs").Scope("websocket").NewGauge("connections"), logger: logger, } } -func (p *InstrumentedProjectCommandOutputHandler) Receive(projectInfo string, receiver chan string, callback func(msg string) error) error { +func (p *InstrumentedProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) { p.numWSConnnections.Inc() defer func() { // Log message to ensure numWSConnnections gauge is being updated properly. @@ -29,5 +40,16 @@ func (p *InstrumentedProjectCommandOutputHandler) Receive(projectInfo string, re p.logger.Debug(fmt.Sprintf("Decreasing num of ws connections for project: %s", projectInfo)) p.numWSConnnections.Dec() }() - return p.ProjectCommandOutputHandler.Receive(projectInfo, receiver, callback) + p.ProjectCommandOutputHandler.Register(projectInfo, receiver) +} + +func (p *InstrumentedProjectCommandOutputHandler) Deregister(projectInfo string, receiver chan string) { + p.numWSConnnections.Inc() + defer func() { + // Log message to ensure numWSConnnections gauge is being updated properly. + // [ORCA-955] TODO: Remove when removing the feature flag for log streaming. + p.logger.Info(fmt.Sprintf("Decreasing num of ws connections for project: %s", projectInfo)) + p.numWSConnnections.Dec() + }() + p.ProjectCommandOutputHandler.Deregister(projectInfo, receiver) } diff --git a/server/handlers/mocks/matchers/handlers_websocketconnectionwrapper.go b/server/handlers/mocks/matchers/handlers_websocketconnectionwrapper.go deleted file mode 100644 index 4d2124911..000000000 --- a/server/handlers/mocks/matchers/handlers_websocketconnectionwrapper.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" - - handlers "github.com/runatlantis/atlantis/server/handlers" -) - -func AnyHandlersWebsocketConnectionWrapper() handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(handlers.WebsocketConnectionWrapper))(nil)).Elem())) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} - -func EqHandlersWebsocketConnectionWrapper(value handlers.WebsocketConnectionWrapper) handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} - -func NotEqHandlersWebsocketConnectionWrapper(value handlers.WebsocketConnectionWrapper) handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} - -func HandlersWebsocketConnectionWrapperThat(matcher pegomock.ArgumentMatcher) handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(matcher) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} diff --git a/server/handlers/mocks/mock_project_command_output_handler.go b/server/handlers/mocks/mock_project_command_output_handler.go index 462a59cde..08add4bd4 100644 --- a/server/handlers/mocks/mock_project_command_output_handler.go +++ b/server/handlers/mocks/mock_project_command_output_handler.go @@ -25,58 +25,51 @@ func NewMockProjectCommandOutputHandler(options ...pegomock.Option) *MockProject func (mock *MockProjectCommandOutputHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockProjectCommandOutputHandler) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockProjectCommandOutputHandler) CleanUp(_param0 string) { +func (mock *MockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) + params := []pegomock.Param{ctx} + pegomock.GetGenericMockFrom(mock).Invoke("Clear", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) { +func (mock *MockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("Clear", params, []reflect.Type{}) + params := []pegomock.Param{ctx, msg} + pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Handle() { +func (mock *MockProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{} - pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) + params := []pegomock.Param{projectInfo, receiver} + pegomock.GetGenericMockFrom(mock).Invoke("Register", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Receive(_param0 string, _param1 chan string, _param2 func(string) error) error { +func (mock *MockProjectCommandOutputHandler) Deregister(projectInfo string, receiver chan string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1, _param2} - result := pegomock.GetGenericMockFrom(mock).Invoke("Receive", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 + params := []pegomock.Param{projectInfo, receiver} + pegomock.GetGenericMockFrom(mock).Invoke("Deregister", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) { +func (mock *MockProjectCommandOutputHandler) Handle() { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1} - pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) + params := []pegomock.Param{} + pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) error { +func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1, _param2} + params := []pegomock.Param{ctx, cmdName, status} result := pegomock.GetGenericMockFrom(mock).Invoke("SetJobURLWithStatus", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) var ret0 error if len(result) != 0 { @@ -87,6 +80,14 @@ func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models. return ret0 } +func (mock *MockProjectCommandOutputHandler) CleanUp(pull string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") + } + params := []pegomock.Param{pull} + pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) +} + func (mock *MockProjectCommandOutputHandler) VerifyWasCalledOnce() *VerifierMockProjectCommandOutputHandler { return &VerifierMockProjectCommandOutputHandler{ mock: mock, @@ -124,94 +125,112 @@ type VerifierMockProjectCommandOutputHandler struct { timeout time.Duration } -func (verifier *VerifierMockProjectCommandOutputHandler) CleanUp(_param0 string) *MockProjectCommandOutputHandler_CleanUp_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) - return &MockProjectCommandOutputHandler_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { + params := []pegomock.Param{ctx} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_CleanUp_OngoingVerification struct { +type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetCapturedArguments() string { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { + ctx := c.GetAllCapturedArguments() + return ctx[len(ctx)-1] } -func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]string, len(c.methodInvocations)) + _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) for u, param := range params[0] { - _param0[u] = param.(string) + _param0[u] = param.(models.ProjectCommandContext) } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) *MockProjectCommandOutputHandler_Send_OngoingVerification { + params := []pegomock.Param{ctx, msg} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { +type MockProjectCommandOutputHandler_Send_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { + ctx, msg := c.GetAllCapturedArguments() + return ctx[len(ctx)-1], msg[len(msg)-1] } -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) for u, param := range params[0] { _param0[u] = param.(models.ProjectCommandContext) } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Handle() *MockProjectCommandOutputHandler_Handle_OngoingVerification { - params := []pegomock.Param{} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Handle", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Handle_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) *MockProjectCommandOutputHandler_Register_OngoingVerification { + params := []pegomock.Param{projectInfo, receiver} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Register", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Register_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Handle_OngoingVerification struct { +type MockProjectCommandOutputHandler_Register_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCapturedArguments() { +func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetCapturedArguments() (string, chan string) { + projectInfo, receiver := c.GetAllCapturedArguments() + return projectInfo[len(projectInfo)-1], receiver[len(receiver)-1] } -func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { +func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + _param1 = make([]chan string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(chan string) + } + } + return } -func (verifier *VerifierMockProjectCommandOutputHandler) Receive(_param0 string, _param1 chan string, _param2 func(string) error) *MockProjectCommandOutputHandler_Receive_OngoingVerification { - params := []pegomock.Param{_param0, _param1, _param2} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Receive", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Receive_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Deregister(projectInfo string, receiver chan string) *MockProjectCommandOutputHandler_Deregister_OngoingVerification { + params := []pegomock.Param{projectInfo, receiver} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Deregister", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Deregister_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Receive_OngoingVerification struct { +type MockProjectCommandOutputHandler_Deregister_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetCapturedArguments() (string, chan string, func(string) error) { - _param0, _param1, _param2 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] +func (c *MockProjectCommandOutputHandler_Deregister_OngoingVerification) GetCapturedArguments() (string, chan string) { + projectInfo, receiver := c.GetAllCapturedArguments() + return projectInfo[len(projectInfo)-1], receiver[len(receiver)-1] } -func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string, _param2 []func(string) error) { +func (c *MockProjectCommandOutputHandler_Deregister_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]string, len(c.methodInvocations)) @@ -222,47 +241,29 @@ func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetAllCapt for u, param := range params[1] { _param1[u] = param.(chan string) } - _param2 = make([]func(string) error, len(c.methodInvocations)) - for u, param := range params[2] { - _param2[u] = param.(func(string) error) - } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) *MockProjectCommandOutputHandler_Send_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Handle() *MockProjectCommandOutputHandler_Handle_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Handle", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Handle_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Send_OngoingVerification struct { +type MockProjectCommandOutputHandler_Handle_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCapturedArguments() { } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(models.ProjectCommandContext) - } - _param1 = make([]string, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(string) - } - } - return +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { } -func (verifier *VerifierMockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification { - params := []pegomock.Param{_param0, _param1, _param2} +func (verifier *VerifierMockProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification { + params := []pegomock.Param{ctx, cmdName, status} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetJobURLWithStatus", params, verifier.timeout) return &MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -273,8 +274,8 @@ type MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification str } func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, models.CommandName, models.CommitStatus) { - _param0, _param1, _param2 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] + ctx, cmdName, status := c.GetAllCapturedArguments() + return ctx[len(ctx)-1], cmdName[len(cmdName)-1], status[len(status)-1] } func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []models.CommandName, _param2 []models.CommitStatus) { @@ -295,3 +296,30 @@ func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification } return } + +func (verifier *VerifierMockProjectCommandOutputHandler) CleanUp(pull string) *MockProjectCommandOutputHandler_CleanUp_OngoingVerification { + params := []pegomock.Param{pull} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) + return &MockProjectCommandOutputHandler_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockProjectCommandOutputHandler_CleanUp_OngoingVerification struct { + mock *MockProjectCommandOutputHandler + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetCapturedArguments() string { + pull := c.GetAllCapturedArguments() + return pull[len(pull)-1] +} + +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} diff --git a/server/handlers/mocks/mock_resource_cleaner.go b/server/handlers/mocks/mock_resource_cleaner.go new file mode 100644 index 000000000..357faf36f --- /dev/null +++ b/server/handlers/mocks/mock_resource_cleaner.go @@ -0,0 +1,97 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: ResourceCleaner) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + "reflect" + "time" +) + +type MockResourceCleaner struct { + fail func(message string, callerSkip ...int) +} + +func NewMockResourceCleaner(options ...pegomock.Option) *MockResourceCleaner { + mock := &MockResourceCleaner{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockResourceCleaner) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockResourceCleaner) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockResourceCleaner) CleanUp(pull string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockResourceCleaner().") + } + params := []pegomock.Param{pull} + pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) +} + +func (mock *MockResourceCleaner) VerifyWasCalledOnce() *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockResourceCleaner) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockResourceCleaner) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockResourceCleaner) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockResourceCleaner struct { + mock *MockResourceCleaner + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockResourceCleaner) CleanUp(pull string) *MockResourceCleaner_CleanUp_OngoingVerification { + params := []pegomock.Param{pull} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) + return &MockResourceCleaner_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockResourceCleaner_CleanUp_OngoingVerification struct { + mock *MockResourceCleaner + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetCapturedArguments() string { + pull := c.GetAllCapturedArguments() + return pull[len(pull)-1] +} + +func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} diff --git a/server/handlers/mocks/mock_websocket_connection_wrapper.go b/server/handlers/mocks/mock_websocket_connection_wrapper.go deleted file mode 100644 index f9293cc70..000000000 --- a/server/handlers/mocks/mock_websocket_connection_wrapper.go +++ /dev/null @@ -1,183 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketConnectionWrapper) - -package mocks - -import ( - pegomock "github.com/petergtz/pegomock" - "reflect" - "time" -) - -type MockWebsocketConnectionWrapper struct { - fail func(message string, callerSkip ...int) -} - -func NewMockWebsocketConnectionWrapper(options ...pegomock.Option) *MockWebsocketConnectionWrapper { - mock := &MockWebsocketConnectionWrapper{} - for _, option := range options { - option.Apply(mock) - } - return mock -} - -func (mock *MockWebsocketConnectionWrapper) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } -func (mock *MockWebsocketConnectionWrapper) FailHandler() pegomock.FailHandler { return mock.fail } - -func (mock *MockWebsocketConnectionWrapper) ReadMessage() (int, []byte, error) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketConnectionWrapper().") - } - params := []pegomock.Param{} - result := pegomock.GetGenericMockFrom(mock).Invoke("ReadMessage", params, []reflect.Type{reflect.TypeOf((*int)(nil)).Elem(), reflect.TypeOf((*[]byte)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 int - var ret1 []byte - var ret2 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(int) - } - if result[1] != nil { - ret1 = result[1].([]byte) - } - if result[2] != nil { - ret2 = result[2].(error) - } - } - return ret0, ret1, ret2 -} - -func (mock *MockWebsocketConnectionWrapper) SetCloseHandler(_param0 func(int, string) error) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketConnectionWrapper().") - } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("SetCloseHandler", params, []reflect.Type{}) -} - -func (mock *MockWebsocketConnectionWrapper) WriteMessage(_param0 int, _param1 []byte) error { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketConnectionWrapper().") - } - params := []pegomock.Param{_param0, _param1} - result := pegomock.GetGenericMockFrom(mock).Invoke("WriteMessage", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalledOnce() *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: pegomock.Times(1), - } -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - } -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - inOrderContext: inOrderContext, - } -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - timeout: timeout, - } -} - -type VerifierMockWebsocketConnectionWrapper struct { - mock *MockWebsocketConnectionWrapper - invocationCountMatcher pegomock.InvocationCountMatcher - inOrderContext *pegomock.InOrderContext - timeout time.Duration -} - -func (verifier *VerifierMockWebsocketConnectionWrapper) ReadMessage() *MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification { - params := []pegomock.Param{} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "ReadMessage", params, verifier.timeout) - return &MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification struct { - mock *MockWebsocketConnectionWrapper - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification) GetCapturedArguments() { -} - -func (c *MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification) GetAllCapturedArguments() { -} - -func (verifier *VerifierMockWebsocketConnectionWrapper) SetCloseHandler(_param0 func(int, string) error) *MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetCloseHandler", params, verifier.timeout) - return &MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification struct { - mock *MockWebsocketConnectionWrapper - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification) GetCapturedArguments() func(int, string) error { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] -} - -func (c *MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification) GetAllCapturedArguments() (_param0 []func(int, string) error) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]func(int, string) error, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(func(int, string) error) - } - } - return -} - -func (verifier *VerifierMockWebsocketConnectionWrapper) WriteMessage(_param0 int, _param1 []byte) *MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "WriteMessage", params, verifier.timeout) - return &MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification struct { - mock *MockWebsocketConnectionWrapper - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification) GetCapturedArguments() (int, []byte) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] -} - -func (c *MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification) GetAllCapturedArguments() (_param0 []int, _param1 [][]byte) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]int, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(int) - } - _param1 = make([][]byte, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.([]byte) - } - } - return -} diff --git a/server/handlers/mocks/mock_websocket_handler.go b/server/handlers/mocks/mock_websocket_handler.go deleted file mode 100644 index 7c7f2e592..000000000 --- a/server/handlers/mocks/mock_websocket_handler.go +++ /dev/null @@ -1,192 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketHandler) - -package mocks - -import ( - pegomock "github.com/petergtz/pegomock" - handlers "github.com/runatlantis/atlantis/server/handlers" - http "net/http" - "reflect" - "time" -) - -type MockWebsocketHandler struct { - fail func(message string, callerSkip ...int) -} - -func NewMockWebsocketHandler(options ...pegomock.Option) *MockWebsocketHandler { - mock := &MockWebsocketHandler{} - for _, option := range options { - option.Apply(mock) - } - return mock -} - -func (mock *MockWebsocketHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } -func (mock *MockWebsocketHandler) FailHandler() pegomock.FailHandler { return mock.fail } - -func (mock *MockWebsocketHandler) SetCloseHandler(_param0 handlers.WebsocketConnectionWrapper, _param1 chan string) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketHandler().") - } - params := []pegomock.Param{_param0, _param1} - pegomock.GetGenericMockFrom(mock).Invoke("SetCloseHandler", params, []reflect.Type{}) -} - -func (mock *MockWebsocketHandler) SetReadHandler(_param0 handlers.WebsocketConnectionWrapper) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketHandler().") - } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("SetReadHandler", params, []reflect.Type{}) -} - -func (mock *MockWebsocketHandler) Upgrade(_param0 http.ResponseWriter, _param1 *http.Request, _param2 http.Header) (handlers.WebsocketConnectionWrapper, error) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketHandler().") - } - params := []pegomock.Param{_param0, _param1, _param2} - result := pegomock.GetGenericMockFrom(mock).Invoke("Upgrade", params, []reflect.Type{reflect.TypeOf((*handlers.WebsocketConnectionWrapper)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 handlers.WebsocketConnectionWrapper - var ret1 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(handlers.WebsocketConnectionWrapper) - } - if result[1] != nil { - ret1 = result[1].(error) - } - } - return ret0, ret1 -} - -func (mock *MockWebsocketHandler) VerifyWasCalledOnce() *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: pegomock.Times(1), - } -} - -func (mock *MockWebsocketHandler) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - } -} - -func (mock *MockWebsocketHandler) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - inOrderContext: inOrderContext, - } -} - -func (mock *MockWebsocketHandler) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - timeout: timeout, - } -} - -type VerifierMockWebsocketHandler struct { - mock *MockWebsocketHandler - invocationCountMatcher pegomock.InvocationCountMatcher - inOrderContext *pegomock.InOrderContext - timeout time.Duration -} - -func (verifier *VerifierMockWebsocketHandler) SetCloseHandler(_param0 handlers.WebsocketConnectionWrapper, _param1 chan string) *MockWebsocketHandler_SetCloseHandler_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetCloseHandler", params, verifier.timeout) - return &MockWebsocketHandler_SetCloseHandler_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketHandler_SetCloseHandler_OngoingVerification struct { - mock *MockWebsocketHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketHandler_SetCloseHandler_OngoingVerification) GetCapturedArguments() (handlers.WebsocketConnectionWrapper, chan string) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] -} - -func (c *MockWebsocketHandler_SetCloseHandler_OngoingVerification) GetAllCapturedArguments() (_param0 []handlers.WebsocketConnectionWrapper, _param1 []chan string) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]handlers.WebsocketConnectionWrapper, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(handlers.WebsocketConnectionWrapper) - } - _param1 = make([]chan string, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(chan string) - } - } - return -} - -func (verifier *VerifierMockWebsocketHandler) SetReadHandler(_param0 handlers.WebsocketConnectionWrapper) *MockWebsocketHandler_SetReadHandler_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetReadHandler", params, verifier.timeout) - return &MockWebsocketHandler_SetReadHandler_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketHandler_SetReadHandler_OngoingVerification struct { - mock *MockWebsocketHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketHandler_SetReadHandler_OngoingVerification) GetCapturedArguments() handlers.WebsocketConnectionWrapper { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] -} - -func (c *MockWebsocketHandler_SetReadHandler_OngoingVerification) GetAllCapturedArguments() (_param0 []handlers.WebsocketConnectionWrapper) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]handlers.WebsocketConnectionWrapper, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(handlers.WebsocketConnectionWrapper) - } - } - return -} - -func (verifier *VerifierMockWebsocketHandler) Upgrade(_param0 http.ResponseWriter, _param1 *http.Request, _param2 http.Header) *MockWebsocketHandler_Upgrade_OngoingVerification { - params := []pegomock.Param{_param0, _param1, _param2} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Upgrade", params, verifier.timeout) - return &MockWebsocketHandler_Upgrade_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketHandler_Upgrade_OngoingVerification struct { - mock *MockWebsocketHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketHandler_Upgrade_OngoingVerification) GetCapturedArguments() (http.ResponseWriter, *http.Request, http.Header) { - _param0, _param1, _param2 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] -} - -func (c *MockWebsocketHandler_Upgrade_OngoingVerification) GetAllCapturedArguments() (_param0 []http.ResponseWriter, _param1 []*http.Request, _param2 []http.Header) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]http.ResponseWriter, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(http.ResponseWriter) - } - _param1 = make([]*http.Request, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(*http.Request) - } - _param2 = make([]http.Header, len(c.methodInvocations)) - for u, param := range params[2] { - _param2[u] = param.(http.Header) - } - } - return -} diff --git a/server/handlers/mocks/mock_websocket_response_writer.go b/server/handlers/mocks/mock_websocket_response_writer.go deleted file mode 100644 index 819c8fe81..000000000 --- a/server/handlers/mocks/mock_websocket_response_writer.go +++ /dev/null @@ -1,140 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketResponseWriter) - -package mocks - -import ( - pegomock "github.com/petergtz/pegomock" - "reflect" - "time" -) - -type MockWebsocketResponseWriter struct { - fail func(message string, callerSkip ...int) -} - -func NewMockWebsocketResponseWriter(options ...pegomock.Option) *MockWebsocketResponseWriter { - mock := &MockWebsocketResponseWriter{} - for _, option := range options { - option.Apply(mock) - } - return mock -} - -func (mock *MockWebsocketResponseWriter) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } -func (mock *MockWebsocketResponseWriter) FailHandler() pegomock.FailHandler { return mock.fail } - -func (mock *MockWebsocketResponseWriter) WriteMessage(messageType int, data []byte) error { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketResponseWriter().") - } - params := []pegomock.Param{messageType, data} - result := pegomock.GetGenericMockFrom(mock).Invoke("WriteMessage", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 -} - -func (mock *MockWebsocketResponseWriter) Close() error { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketResponseWriter().") - } - params := []pegomock.Param{} - result := pegomock.GetGenericMockFrom(mock).Invoke("Close", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalledOnce() *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: pegomock.Times(1), - } -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - } -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - inOrderContext: inOrderContext, - } -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - timeout: timeout, - } -} - -type VerifierMockWebsocketResponseWriter struct { - mock *MockWebsocketResponseWriter - invocationCountMatcher pegomock.InvocationCountMatcher - inOrderContext *pegomock.InOrderContext - timeout time.Duration -} - -func (verifier *VerifierMockWebsocketResponseWriter) WriteMessage(messageType int, data []byte) *MockWebsocketResponseWriter_WriteMessage_OngoingVerification { - params := []pegomock.Param{messageType, data} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "WriteMessage", params, verifier.timeout) - return &MockWebsocketResponseWriter_WriteMessage_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketResponseWriter_WriteMessage_OngoingVerification struct { - mock *MockWebsocketResponseWriter - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketResponseWriter_WriteMessage_OngoingVerification) GetCapturedArguments() (int, []byte) { - messageType, data := c.GetAllCapturedArguments() - return messageType[len(messageType)-1], data[len(data)-1] -} - -func (c *MockWebsocketResponseWriter_WriteMessage_OngoingVerification) GetAllCapturedArguments() (_param0 []int, _param1 [][]byte) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]int, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(int) - } - _param1 = make([][]byte, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.([]byte) - } - } - return -} - -func (verifier *VerifierMockWebsocketResponseWriter) Close() *MockWebsocketResponseWriter_Close_OngoingVerification { - params := []pegomock.Param{} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Close", params, verifier.timeout) - return &MockWebsocketResponseWriter_Close_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketResponseWriter_Close_OngoingVerification struct { - mock *MockWebsocketResponseWriter - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketResponseWriter_Close_OngoingVerification) GetCapturedArguments() { -} - -func (c *MockWebsocketResponseWriter_Close_OngoingVerification) GetAllCapturedArguments() { -} diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go index a83899cff..2504e9649 100644 --- a/server/handlers/project_command_output_handler.go +++ b/server/handlers/project_command_output_handler.go @@ -1,12 +1,9 @@ package handlers import ( - "sync" - - stats "github.com/lyft/gostats" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/logging" + "sync" ) // AsyncProjectCommandOutputHandler is a handler to transport terraform client @@ -50,8 +47,12 @@ type ProjectCommandOutputHandler interface { // Send will enqueue the msg and wait for Handle() to receive the message. Send(ctx models.ProjectCommandContext, msg string) - // Receive will create a channel for projectPullInfo and run a callback function argument when the new channel receives a message. - Receive(projectInfo string, receiver chan string, callback func(msg string) error) error + // Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting + // to read the channel in the same goroutine + Register(projectInfo string, receiver chan string) + + // Deregister removes a channel from successive updates and closes it. + Deregister(projectInfo string, receiver chan string) // Listens for msg from channel Handle() @@ -92,19 +93,8 @@ func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext } } -func (p *AsyncProjectCommandOutputHandler) Receive(projectInfo string, receiver chan string, callback func(msg string) error) error { - // Avoid deadlock when projectOutputBuffer size is greater than the channel (currently set to 1000) - // Running this as a goroutine allows for the channel to be read in callback - go p.addChan(receiver, projectInfo) - defer p.removeChan(projectInfo, receiver) - - for msg := range receiver { - if err := callback(msg); err != nil { - return err - } - } - - return nil +func (p *AsyncProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) { + p.addChan(receiver, projectInfo) } func (p *AsyncProjectCommandOutputHandler) Handle() { @@ -140,18 +130,22 @@ func (p *AsyncProjectCommandOutputHandler) clearLogLines(pull string) { } func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, pull string) { + p.projectOutputBuffersLock.RLock() + buffer := p.projectOutputBuffers[pull] + p.projectOutputBuffersLock.RUnlock() + + for _, line := range buffer { + ch <- line + } + + // add the channel to our registry after we backfill the contents of the buffer, + // to prevent new messages coming in interleaving with this backfill. p.receiverBuffersLock.Lock() if p.receiverBuffers[pull] == nil { p.receiverBuffers[pull] = map[chan string]bool{} } p.receiverBuffers[pull][ch] = true p.receiverBuffersLock.Unlock() - - p.projectOutputBuffersLock.RLock() - for _, line := range p.projectOutputBuffers[pull] { - ch <- line - } - p.projectOutputBuffersLock.RUnlock() } //Add log line to buffer and send to all current channels @@ -186,7 +180,8 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string } //Remove channel, so client no longer receives Terraform output -func (p *AsyncProjectCommandOutputHandler) removeChan(pull string, ch chan string) { +func (p *AsyncProjectCommandOutputHandler) Deregister(pull string, ch chan string) { + p.logger.Debug("Removing channel for %s", pull) p.receiverBuffersLock.Lock() delete(p.receiverBuffers[pull], ch) p.receiverBuffersLock.Unlock() @@ -213,79 +208,14 @@ func (p *AsyncProjectCommandOutputHandler) CleanUp(pull string) { p.receiverBuffersLock.Unlock() } -// [ORCA-955] - Remove feature flag for log-streaming -// FeatureAwareOutputHandler is a decorator that add feature allocator -// functionality to the AsyncProjectCommandOutputHandler -type FeatureAwareOutputHandler struct { - FeatureAllocator feature.Allocator - ProjectCommandOutputHandler -} - -func NewFeatureAwareOutputHandler( - projectCmdOutput chan *models.ProjectCmdOutputLine, - projectStatusUpdater ProjectStatusUpdater, - projectJobURLGenerator ProjectJobURLGenerator, - logger logging.SimpleLogging, - featureAllocator feature.Allocator, - scope stats.Scope, -) ProjectCommandOutputHandler { - prjCmdOutputHandler := NewAsyncProjectCommandOutputHandler( - projectCmdOutput, - projectStatusUpdater, - projectJobURLGenerator, - logger, - ) - return &FeatureAwareOutputHandler{ - FeatureAllocator: featureAllocator, - ProjectCommandOutputHandler: NewInstrumentedProjectCommandOutputHandler(prjCmdOutputHandler, scope, logger), - } -} - -// Helper function to check if the log-streaming feature is enabled -// It dynamically decides based on repo name that is defined in the models.ProjectCommandContext -func (p *FeatureAwareOutputHandler) featureEnabled(ctx models.ProjectCommandContext) bool { - shouldAllocate, err := p.FeatureAllocator.ShouldAllocate(feature.LogStreaming, ctx.Pull.BaseRepo.FullName) - - if err != nil { - ctx.Log.Err("unable to allocate for feature: %s, error: %s", feature.LogStreaming, err) - } - - return shouldAllocate -} - -func (p *FeatureAwareOutputHandler) Clear(ctx models.ProjectCommandContext) { - if !p.featureEnabled(ctx) { - return - } - - p.ProjectCommandOutputHandler.Clear(ctx) -} - -func (p *FeatureAwareOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { - if !p.featureEnabled(ctx) { - return - } - - p.ProjectCommandOutputHandler.Send(ctx, msg) -} - -func (p *FeatureAwareOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { - if !p.featureEnabled(ctx) { - return nil - } - - return p.ProjectCommandOutputHandler.SetJobURLWithStatus(ctx, cmdName, status) -} - // NoopProjectOutputHandler is a mock that doesn't do anything type NoopProjectOutputHandler struct{} func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { } -func (p *NoopProjectOutputHandler) Receive(projectInfo string, receiver chan string, callback func(msg string) error) error { - return nil -} +func (p *NoopProjectOutputHandler) Register(projectInfo string, receiver chan string) {} +func (p *NoopProjectOutputHandler) Deregister(projectInfo string, receiver chan string) {} func (p *NoopProjectOutputHandler) Handle() { } diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go index 6d8d47332..a37a2c90b 100644 --- a/server/handlers/project_command_output_handler_test.go +++ b/server/handlers/project_command_output_handler_test.go @@ -4,12 +4,8 @@ import ( "errors" "sync" "testing" - "time" - "github.com/petergtz/pegomock" "github.com/runatlantis/atlantis/server/events/models" - featuremocks "github.com/runatlantis/atlantis/server/feature/mocks" - featurematchers "github.com/runatlantis/atlantis/server/feature/mocks/matchers" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/handlers/mocks/matchers" @@ -70,139 +66,106 @@ func TestProjectCommandOutputHandler(t *testing.T) { Msg := "Test Terraform Output" ctx := createTestProjectCmdContext(t) - t.Run("Should Receive Message Sent in the ProjectCmdOutput channel", func(t *testing.T) { + t.Run("receive message from main channel", func(t *testing.T) { var wg sync.WaitGroup var expectedMsg string projectOutputHandler := createProjectCommandOutputHandler(t) - wg.Add(1) ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.PullInfo(), ch) + + wg.Add(1) + + // read from channel go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { + for msg := range ch { expectedMsg = msg wg.Done() - return nil - }) - Ok(t, err) + } }() projectOutputHandler.Send(ctx, Msg) - close(ch) - - // Wait for the msg to be read. wg.Wait() + close(ch) Equals(t, expectedMsg, Msg) }) - t.Run("Should Clear ProjectOutputBuffer when new Plan", func(t *testing.T) { + t.Run("clear buffer", func(t *testing.T) { var wg sync.WaitGroup projectOutputHandler := createProjectCommandOutputHandler(t) - wg.Add(1) - ch := make(chan string) - go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { - wg.Done() - return nil - }) - Ok(t, err) - }() - - projectOutputHandler.Send(ctx, Msg) - - // Wait for the msg to be read. - wg.Wait() - - // Send a clear msg - projectOutputHandler.Clear(ctx) - close(ch) - - dfProjectOutputHandler, ok := projectOutputHandler.(*handlers.AsyncProjectCommandOutputHandler) - assert.True(t, ok) - - // Wait for the clear msg to be received by handle() - time.Sleep(1 * time.Second) - assert.Empty(t, dfProjectOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) - }) + - t.Run("Should Cleanup receiverBuffers receiving WS channel closed", func(t *testing.T) { - var wg sync.WaitGroup + ch := make(chan string) - projectOutputHandler := createProjectCommandOutputHandler(t) + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.PullInfo(), ch) wg.Add(1) - ch := make(chan string) + // read from channel asynchronously go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { - wg.Done() - return nil - }) - Ok(t, err) + for msg := range ch { + // we are done once we receive the clear message. + // prior message doesn't matter for this test. + if msg == models.LogStreamingClearMsg { + wg.Done() + } + } }() + // send regular message followed by clear message projectOutputHandler.Send(ctx, Msg) - - // Wait for the msg to be read. + projectOutputHandler.Clear(ctx) wg.Wait() - - // Close chan to execute cleanup. close(ch) - time.Sleep(1 * time.Second) dfProjectOutputHandler, ok := projectOutputHandler.(*handlers.AsyncProjectCommandOutputHandler) assert.True(t, ok) - x := dfProjectOutputHandler.GetReceiverBufferForPull(ctx.PullInfo()) - assert.Empty(t, x) + assert.Empty(t, dfProjectOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) }) - t.Run("Should copy over existing log messages to new WS channels", func(t *testing.T) { + t.Run("copies buffer to new channels", func(t *testing.T) { var wg sync.WaitGroup projectOutputHandler := createProjectCommandOutputHandler(t) - wg.Add(1) - ch := make(chan string) - go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { - wg.Done() - return nil - }) - Ok(t, err) - }() - + // send first message to populated the buffer projectOutputHandler.Send(ctx, Msg) - // Wait for the msg to be read. - wg.Wait() - - // Close channel to close prev connection. - // This should close the first go routine with receive call. - close(ch) - - ch = make(chan string) - - // Expecting two calls to callback. - wg.Add(2) + ch := make(chan string) receivedMsgs := []string{} + + wg.Add(1) + // read from channel asynchronously go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { + for msg := range ch { receivedMsgs = append(receivedMsgs, msg) - wg.Done() - return nil - }) - Ok(t, err) - }() - // Make sure addChan gets the buffer lock and adds ch to the map. - time.Sleep(1 * time.Second) + // we're only expecting two messages here. + if len(receivedMsgs) >= 2 { + wg.Done() + } + } + }() + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.PullInfo(), ch) projectOutputHandler.Send(ctx, Msg) - - // Wait for the message to be read. wg.Wait() close(ch) @@ -251,52 +214,3 @@ func TestProjectCommandOutputHandler(t *testing.T) { assert.Error(t, err) }) } - -func TestFeatureAwareOutputHandler(t *testing.T) { - ctx := createTestProjectCmdContext(t) - RegisterMockTestingT(t) - projectOutputHandler := mocks.NewMockProjectCommandOutputHandler() - - featureAllocator := featuremocks.NewMockAllocator() - featureAwareOutputHandler := handlers.FeatureAwareOutputHandler{ - FeatureAllocator: featureAllocator, - ProjectCommandOutputHandler: projectOutputHandler, - } - - cases := []struct { - Description string - FeatureFlagEnabled bool - }{ - { - Description: "noop when feature is disabled", - FeatureFlagEnabled: false, - }, - { - Description: "delegate when feature is enabled", - FeatureFlagEnabled: true, - }, - } - - for _, c := range cases { - t.Run(c.Description, func(t *testing.T) { - var expectedWasCalled func() *EqMatcher - - if c.FeatureFlagEnabled { - expectedWasCalled = Once - } else { - expectedWasCalled = Never - } - When(featureAllocator.ShouldAllocate(featurematchers.AnyFeatureName(), pegomock.AnyString())).ThenReturn(c.FeatureFlagEnabled, nil) - - err := featureAwareOutputHandler.SetJobURLWithStatus(ctx, models.PlanCommand, models.PendingCommitStatus) - Ok(t, err) - projectOutputHandler.VerifyWasCalled(expectedWasCalled()).SetJobURLWithStatus(matchers.AnyModelsProjectCommandContext(), matchers.AnyModelsCommandName(), matchers.AnyModelsCommitStatus()) - - featureAwareOutputHandler.Clear(ctx) - projectOutputHandler.VerifyWasCalled(expectedWasCalled()).Clear(matchers.AnyModelsProjectCommandContext()) - - featureAwareOutputHandler.Send(ctx, "test") - projectOutputHandler.VerifyWasCalled(expectedWasCalled()).Send(matchers.AnyModelsProjectCommandContext(), pegomock.AnyString()) - }) - } -} diff --git a/server/handlers/websocket_handler.go b/server/handlers/websocket_handler.go deleted file mode 100644 index 42202a3c6..000000000 --- a/server/handlers/websocket_handler.go +++ /dev/null @@ -1,65 +0,0 @@ -package handlers - -import ( - "net/http" - - "github.com/gorilla/websocket" - "github.com/runatlantis/atlantis/server/logging" -) - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_handler.go WebsocketHandler - -type WebsocketHandler interface { - Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketConnectionWrapper, error) - SetReadHandler(w WebsocketConnectionWrapper) - SetCloseHandler(w WebsocketConnectionWrapper, receiver chan string) -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_connection_wrapper.go WebsocketConnectionWrapper - -type WebsocketConnectionWrapper interface { - ReadMessage() (messageType int, p []byte, err error) - WriteMessage(messageType int, data []byte) error - SetCloseHandler(h func(code int, text string) error) -} - -type DefaultWebsocketHandler struct { - handler websocket.Upgrader - Logger logging.SimpleLogging -} - -func NewWebsocketHandler(logger logging.SimpleLogging) WebsocketHandler { - h := websocket.Upgrader{} - h.CheckOrigin = func(r *http.Request) bool { return true } - return &DefaultWebsocketHandler{ - handler: h, - Logger: logger, - } -} - -func (wh *DefaultWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketConnectionWrapper, error) { - return wh.handler.Upgrade(w, r, responseHeader) -} - -func (wh *DefaultWebsocketHandler) SetReadHandler(w WebsocketConnectionWrapper) { - for { - _, _, err := w.ReadMessage() - if err != nil { - // CloseGoingAway (1001) when a browser tab is closed. - // Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - wh.Logger.Warn("Failed to read WS message: %s", err) - } - return - } - } -} - -func (wh *DefaultWebsocketHandler) SetCloseHandler(w WebsocketConnectionWrapper, receiver chan string) { - w.SetCloseHandler(func(code int, text string) error { - // Close the channnel after websocket connection closed. - // Will gracefully exit the ProjectCommandOutputHandler.Receive() call and cleanup. - close(receiver) - return nil - }) -} diff --git a/server/feature/allocator.go b/server/lyft/feature/allocator.go similarity index 100% rename from server/feature/allocator.go rename to server/lyft/feature/allocator.go diff --git a/server/feature/mocks/matchers/feature_name.go b/server/lyft/feature/mocks/matchers/feature_name.go similarity index 92% rename from server/feature/mocks/matchers/feature_name.go rename to server/lyft/feature/mocks/matchers/feature_name.go index 93717f382..b09ae7406 100644 --- a/server/feature/mocks/matchers/feature_name.go +++ b/server/lyft/feature/mocks/matchers/feature_name.go @@ -5,7 +5,7 @@ import ( "github.com/petergtz/pegomock" "reflect" - feature "github.com/runatlantis/atlantis/server/feature" + feature "github.com/runatlantis/atlantis/server/lyft/feature" ) func AnyFeatureName() feature.Name { diff --git a/server/feature/mocks/mock_allocator.go b/server/lyft/feature/mocks/mock_allocator.go similarity index 96% rename from server/feature/mocks/mock_allocator.go rename to server/lyft/feature/mocks/mock_allocator.go index d97ee0018..3c25b162c 100644 --- a/server/feature/mocks/mock_allocator.go +++ b/server/lyft/feature/mocks/mock_allocator.go @@ -1,11 +1,11 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/feature (interfaces: Allocator) +// Source: github.com/runatlantis/atlantis/server/lyft/feature (interfaces: Allocator) package mocks import ( pegomock "github.com/petergtz/pegomock" - feature "github.com/runatlantis/atlantis/server/feature" + feature "github.com/runatlantis/atlantis/server/lyft/feature" "reflect" "time" ) diff --git a/server/feature/names.go b/server/lyft/feature/names.go similarity index 100% rename from server/feature/names.go rename to server/lyft/feature/names.go diff --git a/server/server.go b/server/server.go index a10e6d5de..19872f64a 100644 --- a/server/server.go +++ b/server/server.go @@ -35,11 +35,12 @@ import ( "github.com/mitchellh/go-homedir" "github.com/runatlantis/atlantis/server/core/db" "github.com/runatlantis/atlantis/server/events/yaml/valid" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/lyft/aws" "github.com/runatlantis/atlantis/server/lyft/aws/sns" lyftDecorators "github.com/runatlantis/atlantis/server/lyft/decorators" + "github.com/runatlantis/atlantis/server/lyft/feature" + "github.com/runatlantis/atlantis/server/lyft/scheduled" assetfs "github.com/elazarl/go-bindata-assetfs" "github.com/gorilla/mux" @@ -48,6 +49,7 @@ import ( "github.com/runatlantis/atlantis/server/controllers" events_controllers "github.com/runatlantis/atlantis/server/controllers/events" "github.com/runatlantis/atlantis/server/controllers/templates" + "github.com/runatlantis/atlantis/server/controllers/websocket" "github.com/runatlantis/atlantis/server/core/locking" "github.com/runatlantis/atlantis/server/core/runtime" "github.com/runatlantis/atlantis/server/core/runtime/policy" @@ -108,7 +110,7 @@ type Server struct { SSLCertFile string SSLKeyFile string Drainer *events.Drainer - ScheduledExecutorService *ScheduledExecutorService + ScheduledExecutorService *scheduled.ExecutorService ProjectCmdOutputHandler handlers.ProjectCommandOutputHandler } @@ -200,7 +202,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { } var err error - rawGithubClient, err = vcs.NewGithubClient(userConfig.GithubHostname, githubCredentials, logger) + rawGithubClient, err = vcs.NewGithubClient(userConfig.GithubHostname, githubCredentials, logger, userConfig.VCSStatusName) if err != nil { return nil, err } @@ -294,7 +296,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { return nil, errors.Wrap(err, "initializing webhooks") } vcsClient := vcs.NewClientProxy(githubClient, gitlabClient, bitbucketCloudClient, bitbucketServerClient, azuredevopsClient) - commitStatusUpdater := &events.DefaultCommitStatusUpdater{Client: vcsClient, StatusName: userConfig.VCSStatusName} + commitStatusUpdater := &events.DefaultCommitStatusUpdater{Client: vcsClient, TitleBuilder: vcs.StatusTitleBuilder{TitlePrefix: userConfig.VCSStatusName}} binDir, err := mkSubDir(userConfig.DataDir, BinDirName) @@ -338,16 +340,15 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { var projectCmdOutputHandler handlers.ProjectCommandOutputHandler // When TFE is enabled log streaming is not necessary. - if userConfig.TFEToken != "" || userConfig.TFEHostname != "" { + if userConfig.TFEToken != "" { projectCmdOutputHandler = &handlers.NoopProjectOutputHandler{} } else { projectCmdOutput := make(chan *models.ProjectCmdOutputLine) - projectCmdOutputHandler = handlers.NewFeatureAwareOutputHandler( + projectCmdOutputHandler = handlers.NewInstrumentedProjectCommandOutputHandler( projectCmdOutput, commitStatusUpdater, router, logger, - featureAllocator, statsScope.Scope("api"), ) } @@ -597,11 +598,17 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { return nil, errors.Wrap(err, "initializing new aws session") } - snsWriter := sns.NewWriterWithStats( - session, - userConfig.LyftAuditJobsSnsTopicArn, - statsScope, - ) + var snsWriter sns.Writer + + if userConfig.LyftAuditJobsSnsTopicArn != "" { + snsWriter = sns.NewWriterWithStats( + session, + userConfig.LyftAuditJobsSnsTopicArn, + statsScope, + ) + } else { + snsWriter = sns.NewNoopWriter() + } auditProjectCmdRunner := &lyftDecorators.AuditProjectCommandWrapper{ SnsWriter: snsWriter, @@ -737,16 +744,21 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { DeleteLockCommand: deleteLockCommand, } + wsMux := websocket.NewMultiplexor( + logger, + controllers.ProjectInfoKeyGenerator{}, + projectCmdOutputHandler, + ) + jobsController := &controllers.JobsController{ - AtlantisVersion: config.AtlantisVersion, - AtlantisURL: parsedURL, - Logger: logger, - ProjectJobsTemplate: templates.ProjectJobsTemplate, - ProjectJobsErrorTemplate: templates.ProjectJobsErrorTemplate, - Db: boltdb, - WebsocketHandler: handlers.NewWebsocketHandler(logger), - ProjectCommandOutputHandler: projectCmdOutputHandler, - StatsScope: statsScope.Scope("api"), + AtlantisVersion: config.AtlantisVersion, + AtlantisURL: parsedURL, + Logger: logger, + ProjectJobsTemplate: templates.ProjectJobsTemplate, + ProjectJobsErrorTemplate: templates.ProjectJobsErrorTemplate, + Db: boltdb, + WsMux: wsMux, + StatsScope: statsScope.Scope("api"), } eventsController := &events_controllers.VCSEventsController{ @@ -775,9 +787,10 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { GithubSetupComplete: githubAppEnabled, GithubHostname: userConfig.GithubHostname, GithubOrg: userConfig.GithubOrg, + GithubStatusName: userConfig.VCSStatusName, } - scheduledExecutorService := NewScheduledExecutorService( + scheduledExecutorService := scheduled.NewExecutorService( events.NewFileWorkDirIterator( githubClient, eventParser, @@ -795,7 +808,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { LogStreamResourceCleaner: projectCmdOutputHandler, // using a specific template to signal that this is from an async process - PullClosedTemplate: NewGCStaleClosedPull(), + PullClosedTemplate: scheduled.NewGCStaleClosedPull(), }, // using a pullclosed executor for stale open PRs. Naming is weird, we need to come up with something better. @@ -808,7 +821,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { LogStreamResourceCleaner: projectCmdOutputHandler, // using a specific template to signal that this is from an async process - PullClosedTemplate: NewGCStaleOpenPull(), + PullClosedTemplate: scheduled.NewGCStaleOpenPull(), }, rawGithubClient,