From f887d7ed3976e0acb3a5a74b0696586e91dc5ace Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Thu, 11 Nov 2021 14:55:33 -0800 Subject: [PATCH 01/10] Refactor job controller and websocket logic. --- .../events/events_controller_e2e_test.go | 2 +- ...aming_controller.go => jobs_controller.go} | 62 +++++------ ...roller_test.go => jobs_controller_test.go} | 0 server/controllers/websocket/mux.go | 62 +++++++++++ server/controllers/websocket/writer.go | 70 ++++++++++++ server/core/terraform/terraform_client.go | 2 +- .../terraform_client_internal_test.go | 4 +- .../core/terraform/terraform_client_test.go | 4 +- server/events/command_runner.go | 2 +- server/events/command_runner_test.go | 4 +- server/events/project_command_runner.go | 2 +- server/events/project_command_runner_test.go | 4 +- ...rumented_project_command_output_handler.go | 15 ++- .../project_command_output_handler.go | 101 +++--------------- .../project_command_output_handler_test.go | 52 --------- server/handlers/websocket_handler.go | 66 ------------ server/{ => lyft}/feature/allocator.go | 0 .../feature/mocks/matchers/feature_name.go | 2 +- .../feature/mocks/mock_allocator.go | 4 +- server/{ => lyft}/feature/names.go | 0 server/server.go | 29 ++--- 21 files changed, 215 insertions(+), 272 deletions(-) rename server/controllers/{logstreaming_controller.go => jobs_controller.go} (74%) rename server/controllers/{logstreaming_controller_test.go => jobs_controller_test.go} (100%) create mode 100644 server/controllers/websocket/mux.go create mode 100644 server/controllers/websocket/writer.go delete mode 100644 server/handlers/websocket_handler.go rename server/{ => lyft}/feature/allocator.go (100%) rename server/{ => lyft}/feature/mocks/matchers/feature_name.go (92%) rename server/{ => lyft}/feature/mocks/mock_allocator.go (96%) rename server/{ => lyft}/feature/names.go (100%) diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index 57c95048b..57491091a 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -36,9 +36,9 @@ import ( "github.com/runatlantis/atlantis/server/events/webhooks" "github.com/runatlantis/atlantis/server/events/yaml" "github.com/runatlantis/atlantis/server/events/yaml/valid" - "github.com/runatlantis/atlantis/server/feature" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/controllers/logstreaming_controller.go b/server/controllers/jobs_controller.go similarity index 74% rename from server/controllers/logstreaming_controller.go rename to server/controllers/jobs_controller.go index 6bfd10e0a..8c546d981 100644 --- a/server/controllers/logstreaming_controller.go +++ b/server/controllers/jobs_controller.go @@ -8,9 +8,10 @@ import ( "strconv" "github.com/gorilla/mux" - "github.com/gorilla/websocket" stats "github.com/lyft/gostats" + "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/controllers/templates" + "github.com/runatlantis/atlantis/server/controllers/websocket" "github.com/runatlantis/atlantis/server/core/db" "github.com/runatlantis/atlantis/server/events/metrics" "github.com/runatlantis/atlantis/server/events/models" @@ -19,18 +20,31 @@ import ( ) type JobsController struct { - AtlantisVersion string - AtlantisURL *url.URL - Logger logging.SimpleLogging - ProjectJobsTemplate templates.TemplateWriter - ProjectJobsErrorTemplate templates.TemplateWriter - Db *db.BoltDB - - WebsocketHandler handlers.WebsocketHandler + AtlantisVersion string + AtlantisURL *url.URL + Logger logging.SimpleLogging + ProjectJobsTemplate templates.TemplateWriter + ProjectJobsErrorTemplate templates.TemplateWriter + Db *db.BoltDB + WsMux *websocket.Multiplexor + + websocketWriter *websocket.Writer ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler StatsScope stats.Scope } +type ProjectInfoKeyGenerator struct{} + +func (g ProjectInfoKeyGenerator) Generate(r *http.Request) (string, error) { + projectInfo, err := newProjectInfo(r) + + if err != nil { + return "", errors.Wrap(err, "creating project info") + } + + return projectInfo.String(), nil +} + type pullInfo struct { org string repo string @@ -132,37 +146,9 @@ func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request) } func (j *JobsController) getProjectJobsWS(w http.ResponseWriter, r *http.Request) error { - projectInfo, err := newProjectInfo(r) - if err != nil { - j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) - return err - } - - c, err := j.WebsocketHandler.Upgrade(w, r, nil) - if err != nil { - j.Logger.Warn("Failed to upgrade websocket: %s", err) - return err - } - - // Buffer size set to 1000 to ensure messages get queued (upto 1000) if the receiverCh is not ready to - // receive messages before the channel is closed and resources cleaned up. - receiver := make(chan string, 1000) - j.WebsocketHandler.SetCloseHandler(c, receiver) - - // Add a reader goroutine to listen for socket.close() events. - go j.WebsocketHandler.SetReadHandler(c) - - pull := projectInfo.String() - err = j.ProjectCommandOutputHandler.Receive(pull, receiver, func(msg string) error { - if err := c.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil { - j.Logger.Warn("Failed to write ws message: %s", err) - return err - } - return nil - }) + err := j.wsMux.Handle(w, r) if err != nil { - j.Logger.Warn("Failed to receive message: %s", err) j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) return err } diff --git a/server/controllers/logstreaming_controller_test.go b/server/controllers/jobs_controller_test.go similarity index 100% rename from server/controllers/logstreaming_controller_test.go rename to server/controllers/jobs_controller_test.go diff --git a/server/controllers/websocket/mux.go b/server/controllers/websocket/mux.go new file mode 100644 index 000000000..6a5b407bb --- /dev/null +++ b/server/controllers/websocket/mux.go @@ -0,0 +1,62 @@ +package websocket + +import ( + "net/http" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/logging" +) + +// PartitionKeyGenerator generates partition keys for the multiplexor +type PartitionKeyGenerator interface { + Generate(r *http.Request) (string, error) +} + +// PartitionRegistry is the registry holding each partition +// and is responsible for registering/deregistering new buffers +type PartitionRegistry interface { + Register(key string, buffer chan string) + Deregister(key string, buffer chan string) +} + +// Multiplexor is responsible for handling the data transfer between the storage layer +// and the registry. Note this is still a WIP as right now the registry is assumed to handle +// everything. +type Multiplexor struct { + writer *Writer + keyGenerator PartitionKeyGenerator + registry PartitionRegistry +} + +func NewMultiplexor(log logging.SimpleLogging, keyGenerator PartitionKeyGenerator, registry PartitionRegistry) *Multiplexor { + upgrader := websocket.Upgrader{} + upgrader.CheckOrigin = func(r *http.Request) bool { return true } + return &Multiplexor{ + writer: &Writer{ + upgrader: upgrader, + log: log, + }, + keyGenerator: keyGenerator, + } +} + +// Handle should be called for a given websocket request. It blocks +// while writing to the websocket until the buffer is closed. +func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error { + key, err := m.keyGenerator.Generate(r) + + if err != nil { + return errors.Wrapf(err, "generating partition key") + } + + // Buffer size set to 1000 to ensure messages get queued. + // TODO: make buffer size configurable + buffer := make(chan string, 1000) + + // spinning up a goroutine for this since we are attempting to block on the read side. + go m.registry.Register(key, buffer) + defer m.registry.Deregister(key, buffer) + + return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key) +} diff --git a/server/controllers/websocket/writer.go b/server/controllers/websocket/writer.go new file mode 100644 index 000000000..7c266d474 --- /dev/null +++ b/server/controllers/websocket/writer.go @@ -0,0 +1,70 @@ +package websocket + +import ( + "net/http" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/logging" +) + +func NewWriter(log logging.SimpleLogging) *Writer { + upgrader := websocket.Upgrader{} + upgrader.CheckOrigin = func(r *http.Request) bool { return true } + return &Writer{ + upgrader: upgrader, + log: log, + } +} + +type Writer struct { + upgrader websocket.Upgrader + + //TODO: Remove dependency on atlantis logger here if we upstream this. + log logging.SimpleLogging +} + +func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan string) error { + conn, err := w.upgrader.Upgrade(rw, r, nil) + + if err != nil { + return errors.Wrap(err, "upgrading websocket connection") + } + + conn.SetCloseHandler(func(code int, text string) error { + // Close the channnel after websocket connection closed. + // Will gracefully exit the ProjectCommandOutputHandler.Receive() call and cleanup. + // is it good practice to close at the receiver? Probably not, we should figure out a better + // way to handle this case + close(input) + return nil + }) + + // Add a reader goroutine to listen for socket.close() events. + go w.setReadHandler(conn) + + // block on reading our input channel + for msg := range input { + if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil { + w.log.Warn("Failed to write ws message: %s", err) + return err + } + } + + return nil +} + +func (w *Writer) setReadHandler(c *websocket.Conn) { + for { + _, _, err := c.ReadMessage() + if err != nil { + // CloseGoingAway (1001) when a browser tab is closed. + // Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + w.log.Warn("Failed to read WS message: %s", err) + } + return + } + } + +} diff --git a/server/core/terraform/terraform_client.go b/server/core/terraform/terraform_client.go index 0bca07a4b..74ee06597 100644 --- a/server/core/terraform/terraform_client.go +++ b/server/core/terraform/terraform_client.go @@ -34,9 +34,9 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/terraform/ansi" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" ) var LogStreamingValidCmds = [...]string{"init", "plan", "apply"} diff --git a/server/core/terraform/terraform_client_internal_test.go b/server/core/terraform/terraform_client_internal_test.go index 8c6e67ffb..a01d818d2 100644 --- a/server/core/terraform/terraform_client_internal_test.go +++ b/server/core/terraform/terraform_client_internal_test.go @@ -11,10 +11,10 @@ import ( version "github.com/hashicorp/go-version" . "github.com/petergtz/pegomock" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/feature" - fmocks "github.com/runatlantis/atlantis/server/feature/mocks" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/core/terraform/terraform_client_test.go b/server/core/terraform/terraform_client_test.go index a471eb746..0abcc24cd 100644 --- a/server/core/terraform/terraform_client_test.go +++ b/server/core/terraform/terraform_client_test.go @@ -29,10 +29,10 @@ import ( "github.com/runatlantis/atlantis/server/core/terraform" "github.com/runatlantis/atlantis/server/core/terraform/mocks" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/feature" - fmocks "github.com/runatlantis/atlantis/server/feature/mocks" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/events/command_runner.go b/server/events/command_runner.go index 755fc70ee..ee50f5b13 100644 --- a/server/events/command_runner.go +++ b/server/events/command_runner.go @@ -25,8 +25,8 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/vcs" "github.com/runatlantis/atlantis/server/events/yaml/valid" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" "github.com/runatlantis/atlantis/server/recovery" gitlab "github.com/xanzy/go-gitlab" ) diff --git a/server/events/command_runner_test.go b/server/events/command_runner_test.go index 69daf3cd2..a6da9698d 100644 --- a/server/events/command_runner_test.go +++ b/server/events/command_runner_test.go @@ -36,8 +36,8 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/models/fixtures" vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks" - "github.com/runatlantis/atlantis/server/feature" - fmocks "github.com/runatlantis/atlantis/server/feature/mocks" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/events/project_command_runner.go b/server/events/project_command_runner.go index 5145fc80d..fe3afb754 100644 --- a/server/events/project_command_runner.go +++ b/server/events/project_command_runner.go @@ -24,9 +24,9 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/webhooks" "github.com/runatlantis/atlantis/server/events/yaml/valid" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" ) // DirNotExistErr is an error caused by the directory not existing. diff --git a/server/events/project_command_runner_test.go b/server/events/project_command_runner_test.go index 00831b48d..46a5c6d61 100644 --- a/server/events/project_command_runner_test.go +++ b/server/events/project_command_runner_test.go @@ -29,10 +29,10 @@ import ( "github.com/runatlantis/atlantis/server/events/mocks/matchers" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/yaml/valid" - "github.com/runatlantis/atlantis/server/feature" - fmocks "github.com/runatlantis/atlantis/server/feature/mocks" handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" + "github.com/runatlantis/atlantis/server/lyft/feature" + fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks" . "github.com/runatlantis/atlantis/testing" ) diff --git a/server/handlers/instrumented_project_command_output_handler.go b/server/handlers/instrumented_project_command_output_handler.go index eb9effcc7..d02bf575b 100644 --- a/server/handlers/instrumented_project_command_output_handler.go +++ b/server/handlers/instrumented_project_command_output_handler.go @@ -4,6 +4,7 @@ import ( "fmt" stats "github.com/lyft/gostats" + "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/logging" ) @@ -13,10 +14,20 @@ type InstrumentedProjectCommandOutputHandler struct { logger logging.SimpleLogging } -func NewInstrumentedProjectCommandOutputHandler(prjCmdOutputHandler ProjectCommandOutputHandler, statsScope stats.Scope, logger logging.SimpleLogging) ProjectCommandOutputHandler { +func NewInstrumentedProjectCommandOutputHandler(projectCmdOutput chan *models.ProjectCmdOutputLine, + projectStatusUpdater ProjectStatusUpdater, + projectJobURLGenerator ProjectJobURLGenerator, + logger logging.SimpleLogging, + scope stats.Scope) ProjectCommandOutputHandler { + prjCmdOutputHandler := NewAsyncProjectCommandOutputHandler( + projectCmdOutput, + projectStatusUpdater, + projectJobURLGenerator, + logger, + ) return &InstrumentedProjectCommandOutputHandler{ ProjectCommandOutputHandler: prjCmdOutputHandler, - numWSConnnections: statsScope.Scope("getprojectjobs").Scope("websocket").NewGauge("connections"), + numWSConnnections: scope.Scope("getprojectjobs").Scope("websocket").NewGauge("connections"), logger: logger, } } diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go index 0b8a7e128..ff2ef7c3e 100644 --- a/server/handlers/project_command_output_handler.go +++ b/server/handlers/project_command_output_handler.go @@ -2,12 +2,9 @@ package handlers import ( "fmt" - "sync" - - stats "github.com/lyft/gostats" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/logging" + "sync" ) // AsyncProjectCommandOutputHandler is a handler to transport terraform client @@ -51,8 +48,12 @@ type ProjectCommandOutputHandler interface { // Send will enqueue the msg and wait for Handle() to receive the message. Send(ctx models.ProjectCommandContext, msg string) - // Receive will create a channel for projectPullInfo and run a callback function argument when the new channel receives a message. - Receive(projectInfo string, receiver chan string, callback func(msg string) error) error + // Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting + // to read the channel in the same goroutine + Register(projectInfo string, receiver chan string) + + // Deregister removes a channel from successive updates and closes it. + Deregister(projectInfo string, receiver chan string) // Listens for msg from channel Handle() @@ -93,20 +94,11 @@ func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext } } -func (p *AsyncProjectCommandOutputHandler) Receive(projectInfo string, receiver chan string, callback func(msg string) error) error { - // Avoid deadlock when projectOutputBuffer size is greater than the channel (currently set to 1000) - // Running this as a goroutine allows for the channel to be read in callback - go p.addChan(receiver, projectInfo) - defer p.removeChan(projectInfo, receiver) +func (p *AsyncProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) { + p.addChan(receiver, projectInfo) +} - for msg := range receiver { - if err := callback(msg); err != nil { - return err - } - } - return nil -} func (p *AsyncProjectCommandOutputHandler) Handle() { for msg := range p.projectCmdOutput { @@ -189,8 +181,8 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string } //Remove channel, so client no longer receives Terraform output -func (p *AsyncProjectCommandOutputHandler) removeChan(pull string, ch chan string) { - p.logger.Info(fmt.Sprintf("Removing channel for %s", pull)) +func (p *AsyncProjectCommandOutputHandler) Deregister(pull string, ch chan string) { + p.logger.Debug(fmt.Sprintf("Removing channel for %s", pull)) p.receiverBuffersLock.Lock() delete(p.receiverBuffers[pull], ch) p.receiverBuffersLock.Unlock() @@ -217,79 +209,14 @@ func (p *AsyncProjectCommandOutputHandler) CleanUp(pull string) { p.receiverBuffersLock.Unlock() } -// [ORCA-955] - Remove feature flag for log-streaming -// FeatureAwareOutputHandler is a decorator that add feature allocator -// functionality to the AsyncProjectCommandOutputHandler -type FeatureAwareOutputHandler struct { - FeatureAllocator feature.Allocator - ProjectCommandOutputHandler -} - -func NewFeatureAwareOutputHandler( - projectCmdOutput chan *models.ProjectCmdOutputLine, - projectStatusUpdater ProjectStatusUpdater, - projectJobURLGenerator ProjectJobURLGenerator, - logger logging.SimpleLogging, - featureAllocator feature.Allocator, - scope stats.Scope, -) ProjectCommandOutputHandler { - prjCmdOutputHandler := NewAsyncProjectCommandOutputHandler( - projectCmdOutput, - projectStatusUpdater, - projectJobURLGenerator, - logger, - ) - return &FeatureAwareOutputHandler{ - FeatureAllocator: featureAllocator, - ProjectCommandOutputHandler: NewInstrumentedProjectCommandOutputHandler(prjCmdOutputHandler, scope, logger), - } -} - -// Helper function to check if the log-streaming feature is enabled -// It dynamically decides based on repo name that is defined in the models.ProjectCommandContext -func (p *FeatureAwareOutputHandler) featureEnabled(ctx models.ProjectCommandContext) bool { - shouldAllocate, err := p.FeatureAllocator.ShouldAllocate(feature.LogStreaming, ctx.Pull.BaseRepo.FullName) - - if err != nil { - ctx.Log.Err("unable to allocate for feature: %s, error: %s", feature.LogStreaming, err) - } - - return shouldAllocate -} - -func (p *FeatureAwareOutputHandler) Clear(ctx models.ProjectCommandContext) { - if !p.featureEnabled(ctx) { - return - } - - p.ProjectCommandOutputHandler.Clear(ctx) -} - -func (p *FeatureAwareOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { - if !p.featureEnabled(ctx) { - return - } - - p.ProjectCommandOutputHandler.Send(ctx, msg) -} - -func (p *FeatureAwareOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { - if !p.featureEnabled(ctx) { - return nil - } - - return p.ProjectCommandOutputHandler.SetJobURLWithStatus(ctx, cmdName, status) -} - // NoopProjectOutputHandler is a mock that doesn't do anything type NoopProjectOutputHandler struct{} func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { } -func (p *NoopProjectOutputHandler) Receive(projectInfo string, receiver chan string, callback func(msg string) error) error { - return nil -} +func (p *NoopProjectOutputHandler) Register(projectInfo string, receiver chan string) {} +func (p *NoopProjectOutputHandler) Deregister(projectInfo string, receiver chan string) {} func (p *NoopProjectOutputHandler) Handle() { } diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go index 6f183abf2..b7c6af914 100644 --- a/server/handlers/project_command_output_handler_test.go +++ b/server/handlers/project_command_output_handler_test.go @@ -6,10 +6,7 @@ import ( "testing" "time" - "github.com/petergtz/pegomock" "github.com/runatlantis/atlantis/server/events/models" - featuremocks "github.com/runatlantis/atlantis/server/feature/mocks" - featurematchers "github.com/runatlantis/atlantis/server/feature/mocks/matchers" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/handlers/mocks/matchers" @@ -251,52 +248,3 @@ func TestProjectCommandOutputHandler(t *testing.T) { assert.Error(t, err) }) } - -func TestFeatureAwareOutputHandler(t *testing.T) { - ctx := createTestProjectCmdContext(t) - RegisterMockTestingT(t) - projectOutputHandler := mocks.NewMockProjectCommandOutputHandler() - - featureAllocator := featuremocks.NewMockAllocator() - featureAwareOutputHandler := handlers.FeatureAwareOutputHandler{ - FeatureAllocator: featureAllocator, - ProjectCommandOutputHandler: projectOutputHandler, - } - - cases := []struct { - Description string - FeatureFlagEnabled bool - }{ - { - Description: "noop when feature is disabled", - FeatureFlagEnabled: false, - }, - { - Description: "delegate when feature is enabled", - FeatureFlagEnabled: true, - }, - } - - for _, c := range cases { - t.Run(c.Description, func(t *testing.T) { - var expectedWasCalled func() *EqMatcher - - if c.FeatureFlagEnabled { - expectedWasCalled = Once - } else { - expectedWasCalled = Never - } - When(featureAllocator.ShouldAllocate(featurematchers.AnyFeatureName(), pegomock.AnyString())).ThenReturn(c.FeatureFlagEnabled, nil) - - err := featureAwareOutputHandler.SetJobURLWithStatus(ctx, models.PlanCommand, models.PendingCommitStatus) - Ok(t, err) - projectOutputHandler.VerifyWasCalled(expectedWasCalled()).SetJobURLWithStatus(matchers.AnyModelsProjectCommandContext(), matchers.AnyModelsCommandName(), matchers.AnyModelsCommitStatus()) - - featureAwareOutputHandler.Clear(ctx) - projectOutputHandler.VerifyWasCalled(expectedWasCalled()).Clear(matchers.AnyModelsProjectCommandContext()) - - featureAwareOutputHandler.Send(ctx, "test") - projectOutputHandler.VerifyWasCalled(expectedWasCalled()).Send(matchers.AnyModelsProjectCommandContext(), pegomock.AnyString()) - }) - } -} diff --git a/server/handlers/websocket_handler.go b/server/handlers/websocket_handler.go deleted file mode 100644 index abc32fbe6..000000000 --- a/server/handlers/websocket_handler.go +++ /dev/null @@ -1,66 +0,0 @@ -package handlers - -import ( - "net/http" - - "github.com/gorilla/websocket" - "github.com/runatlantis/atlantis/server/logging" -) - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_handler.go WebsocketHandler - -type WebsocketHandler interface { - Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketConnectionWrapper, error) - SetReadHandler(w WebsocketConnectionWrapper) - SetCloseHandler(w WebsocketConnectionWrapper, receiver chan string) -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_connection_wrapper.go WebsocketConnectionWrapper - -type WebsocketConnectionWrapper interface { - ReadMessage() (messageType int, p []byte, err error) - WriteMessage(messageType int, data []byte) error - SetCloseHandler(h func(code int, text string) error) -} - -type DefaultWebsocketHandler struct { - handler websocket.Upgrader - Logger logging.SimpleLogging -} - -func NewWebsocketHandler(logger logging.SimpleLogging) WebsocketHandler { - h := websocket.Upgrader{} - h.CheckOrigin = func(r *http.Request) bool { return true } - return &DefaultWebsocketHandler{ - handler: h, - Logger: logger, - } -} - -func (wh *DefaultWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketConnectionWrapper, error) { - return wh.handler.Upgrade(w, r, responseHeader) -} - -func (wh *DefaultWebsocketHandler) SetReadHandler(w WebsocketConnectionWrapper) { - for { - _, _, err := w.ReadMessage() - if err != nil { - // CloseGoingAway (1001) when a browser tab is closed. - // Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - wh.Logger.Warn("Failed to read WS message: %s", err) - } - return - } - } -} - -func (wh *DefaultWebsocketHandler) SetCloseHandler(w WebsocketConnectionWrapper, receiver chan string) { - w.SetCloseHandler(func(code int, text string) error { - // Close the channnel after websocket connection closed. - // Will gracefully exit the ProjectCommandOutputHandler.Receive() call and cleanup. - wh.Logger.Info("Close handler called") - close(receiver) - return nil - }) -} diff --git a/server/feature/allocator.go b/server/lyft/feature/allocator.go similarity index 100% rename from server/feature/allocator.go rename to server/lyft/feature/allocator.go diff --git a/server/feature/mocks/matchers/feature_name.go b/server/lyft/feature/mocks/matchers/feature_name.go similarity index 92% rename from server/feature/mocks/matchers/feature_name.go rename to server/lyft/feature/mocks/matchers/feature_name.go index 93717f382..b09ae7406 100644 --- a/server/feature/mocks/matchers/feature_name.go +++ b/server/lyft/feature/mocks/matchers/feature_name.go @@ -5,7 +5,7 @@ import ( "github.com/petergtz/pegomock" "reflect" - feature "github.com/runatlantis/atlantis/server/feature" + feature "github.com/runatlantis/atlantis/server/lyft/feature" ) func AnyFeatureName() feature.Name { diff --git a/server/feature/mocks/mock_allocator.go b/server/lyft/feature/mocks/mock_allocator.go similarity index 96% rename from server/feature/mocks/mock_allocator.go rename to server/lyft/feature/mocks/mock_allocator.go index d97ee0018..3c25b162c 100644 --- a/server/feature/mocks/mock_allocator.go +++ b/server/lyft/feature/mocks/mock_allocator.go @@ -1,11 +1,11 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/feature (interfaces: Allocator) +// Source: github.com/runatlantis/atlantis/server/lyft/feature (interfaces: Allocator) package mocks import ( pegomock "github.com/petergtz/pegomock" - feature "github.com/runatlantis/atlantis/server/feature" + feature "github.com/runatlantis/atlantis/server/lyft/feature" "reflect" "time" ) diff --git a/server/feature/names.go b/server/lyft/feature/names.go similarity index 100% rename from server/feature/names.go rename to server/lyft/feature/names.go diff --git a/server/server.go b/server/server.go index 52f21b942..7e6b71b65 100644 --- a/server/server.go +++ b/server/server.go @@ -35,11 +35,11 @@ import ( "github.com/mitchellh/go-homedir" "github.com/runatlantis/atlantis/server/core/db" "github.com/runatlantis/atlantis/server/events/yaml/valid" - "github.com/runatlantis/atlantis/server/feature" "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/lyft/aws" "github.com/runatlantis/atlantis/server/lyft/aws/sns" lyftDecorators "github.com/runatlantis/atlantis/server/lyft/decorators" + "github.com/runatlantis/atlantis/server/lyft/feature" "github.com/runatlantis/atlantis/server/lyft/scheduled" assetfs "github.com/elazarl/go-bindata-assetfs" @@ -49,6 +49,7 @@ import ( "github.com/runatlantis/atlantis/server/controllers" events_controllers "github.com/runatlantis/atlantis/server/controllers/events" "github.com/runatlantis/atlantis/server/controllers/templates" + "github.com/runatlantis/atlantis/server/controllers/websocket" "github.com/runatlantis/atlantis/server/core/locking" "github.com/runatlantis/atlantis/server/core/runtime" "github.com/runatlantis/atlantis/server/core/runtime/policy" @@ -343,12 +344,11 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { projectCmdOutputHandler = &handlers.NoopProjectOutputHandler{} } else { projectCmdOutput := make(chan *models.ProjectCmdOutputLine) - projectCmdOutputHandler = handlers.NewFeatureAwareOutputHandler( + projectCmdOutputHandler = handlers.NewInstrumentedProjectCommandOutputHandler( projectCmdOutput, commitStatusUpdater, router, logger, - featureAllocator, statsScope.Scope("api"), ) } @@ -744,16 +744,21 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { DeleteLockCommand: deleteLockCommand, } + wsMux := websocket.NewMultiplexor( + logger, + controllers.ProjectInfoKeyGenerator{}, + projectCmdOutputHandler, + ) + jobsController := &controllers.JobsController{ - AtlantisVersion: config.AtlantisVersion, - AtlantisURL: parsedURL, - Logger: logger, - ProjectJobsTemplate: templates.ProjectJobsTemplate, - ProjectJobsErrorTemplate: templates.ProjectJobsErrorTemplate, - Db: boltdb, - WebsocketHandler: handlers.NewWebsocketHandler(logger), - ProjectCommandOutputHandler: projectCmdOutputHandler, - StatsScope: statsScope.Scope("api"), + AtlantisVersion: config.AtlantisVersion, + AtlantisURL: parsedURL, + Logger: logger, + ProjectJobsTemplate: templates.ProjectJobsTemplate, + ProjectJobsErrorTemplate: templates.ProjectJobsErrorTemplate, + Db: boltdb, + WsMux: wsMux, + StatsScope: statsScope.Scope("api"), } eventsController := &events_controllers.VCSEventsController{ From b05259d87e47ed4ce6c30493f21707452a9edef8 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Thu, 11 Nov 2021 17:51:25 -0800 Subject: [PATCH 02/10] generate mocks. --- ...rumented_project_command_output_handler.go | 15 +- .../mock_project_command_output_handler.go | 206 ++++++++++-------- .../handlers/mocks/mock_resource_cleaner.go | 97 +++++++++ .../mock_websocket_connection_wrapper.go | 183 ---------------- .../handlers/mocks/mock_websocket_handler.go | 192 ---------------- .../mocks/mock_websocket_response_writer.go | 140 ------------ 6 files changed, 227 insertions(+), 606 deletions(-) create mode 100644 server/handlers/mocks/mock_resource_cleaner.go delete mode 100644 server/handlers/mocks/mock_websocket_connection_wrapper.go delete mode 100644 server/handlers/mocks/mock_websocket_handler.go delete mode 100644 server/handlers/mocks/mock_websocket_response_writer.go diff --git a/server/handlers/instrumented_project_command_output_handler.go b/server/handlers/instrumented_project_command_output_handler.go index d02bf575b..14ee571b1 100644 --- a/server/handlers/instrumented_project_command_output_handler.go +++ b/server/handlers/instrumented_project_command_output_handler.go @@ -32,7 +32,7 @@ func NewInstrumentedProjectCommandOutputHandler(projectCmdOutput chan *models.Pr } } -func (p *InstrumentedProjectCommandOutputHandler) Receive(projectInfo string, receiver chan string, callback func(msg string) error) error { +func (p *InstrumentedProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) { p.numWSConnnections.Inc() defer func() { // Log message to ensure numWSConnnections gauge is being updated properly. @@ -40,5 +40,16 @@ func (p *InstrumentedProjectCommandOutputHandler) Receive(projectInfo string, re p.logger.Info(fmt.Sprintf("Decreasing num of ws connections for project: %s", projectInfo)) p.numWSConnnections.Dec() }() - return p.ProjectCommandOutputHandler.Receive(projectInfo, receiver, callback) + p.ProjectCommandOutputHandler.Register(projectInfo, receiver) +} + +func (p *InstrumentedProjectCommandOutputHandler) Deregister(projectInfo string, receiver chan string) { + p.numWSConnnections.Inc() + defer func() { + // Log message to ensure numWSConnnections gauge is being updated properly. + // [ORCA-955] TODO: Remove when removing the feature flag for log streaming. + p.logger.Info(fmt.Sprintf("Decreasing num of ws connections for project: %s", projectInfo)) + p.numWSConnnections.Dec() + }() + p.ProjectCommandOutputHandler.Deregister(projectInfo, receiver) } diff --git a/server/handlers/mocks/mock_project_command_output_handler.go b/server/handlers/mocks/mock_project_command_output_handler.go index 462a59cde..08add4bd4 100644 --- a/server/handlers/mocks/mock_project_command_output_handler.go +++ b/server/handlers/mocks/mock_project_command_output_handler.go @@ -25,58 +25,51 @@ func NewMockProjectCommandOutputHandler(options ...pegomock.Option) *MockProject func (mock *MockProjectCommandOutputHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockProjectCommandOutputHandler) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockProjectCommandOutputHandler) CleanUp(_param0 string) { +func (mock *MockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) + params := []pegomock.Param{ctx} + pegomock.GetGenericMockFrom(mock).Invoke("Clear", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) { +func (mock *MockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("Clear", params, []reflect.Type{}) + params := []pegomock.Param{ctx, msg} + pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Handle() { +func (mock *MockProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{} - pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) + params := []pegomock.Param{projectInfo, receiver} + pegomock.GetGenericMockFrom(mock).Invoke("Register", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Receive(_param0 string, _param1 chan string, _param2 func(string) error) error { +func (mock *MockProjectCommandOutputHandler) Deregister(projectInfo string, receiver chan string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1, _param2} - result := pegomock.GetGenericMockFrom(mock).Invoke("Receive", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 + params := []pegomock.Param{projectInfo, receiver} + pegomock.GetGenericMockFrom(mock).Invoke("Deregister", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) { +func (mock *MockProjectCommandOutputHandler) Handle() { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1} - pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) + params := []pegomock.Param{} + pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) error { +func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1, _param2} + params := []pegomock.Param{ctx, cmdName, status} result := pegomock.GetGenericMockFrom(mock).Invoke("SetJobURLWithStatus", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) var ret0 error if len(result) != 0 { @@ -87,6 +80,14 @@ func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models. return ret0 } +func (mock *MockProjectCommandOutputHandler) CleanUp(pull string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") + } + params := []pegomock.Param{pull} + pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) +} + func (mock *MockProjectCommandOutputHandler) VerifyWasCalledOnce() *VerifierMockProjectCommandOutputHandler { return &VerifierMockProjectCommandOutputHandler{ mock: mock, @@ -124,94 +125,112 @@ type VerifierMockProjectCommandOutputHandler struct { timeout time.Duration } -func (verifier *VerifierMockProjectCommandOutputHandler) CleanUp(_param0 string) *MockProjectCommandOutputHandler_CleanUp_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) - return &MockProjectCommandOutputHandler_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { + params := []pegomock.Param{ctx} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_CleanUp_OngoingVerification struct { +type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetCapturedArguments() string { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { + ctx := c.GetAllCapturedArguments() + return ctx[len(ctx)-1] } -func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { +func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]string, len(c.methodInvocations)) + _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) for u, param := range params[0] { - _param0[u] = param.(string) + _param0[u] = param.(models.ProjectCommandContext) } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) *MockProjectCommandOutputHandler_Send_OngoingVerification { + params := []pegomock.Param{ctx, msg} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { +type MockProjectCommandOutputHandler_Send_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { + ctx, msg := c.GetAllCapturedArguments() + return ctx[len(ctx)-1], msg[len(msg)-1] } -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) for u, param := range params[0] { _param0[u] = param.(models.ProjectCommandContext) } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Handle() *MockProjectCommandOutputHandler_Handle_OngoingVerification { - params := []pegomock.Param{} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Handle", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Handle_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) *MockProjectCommandOutputHandler_Register_OngoingVerification { + params := []pegomock.Param{projectInfo, receiver} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Register", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Register_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Handle_OngoingVerification struct { +type MockProjectCommandOutputHandler_Register_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCapturedArguments() { +func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetCapturedArguments() (string, chan string) { + projectInfo, receiver := c.GetAllCapturedArguments() + return projectInfo[len(projectInfo)-1], receiver[len(receiver)-1] } -func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { +func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + _param1 = make([]chan string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(chan string) + } + } + return } -func (verifier *VerifierMockProjectCommandOutputHandler) Receive(_param0 string, _param1 chan string, _param2 func(string) error) *MockProjectCommandOutputHandler_Receive_OngoingVerification { - params := []pegomock.Param{_param0, _param1, _param2} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Receive", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Receive_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Deregister(projectInfo string, receiver chan string) *MockProjectCommandOutputHandler_Deregister_OngoingVerification { + params := []pegomock.Param{projectInfo, receiver} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Deregister", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Deregister_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Receive_OngoingVerification struct { +type MockProjectCommandOutputHandler_Deregister_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetCapturedArguments() (string, chan string, func(string) error) { - _param0, _param1, _param2 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] +func (c *MockProjectCommandOutputHandler_Deregister_OngoingVerification) GetCapturedArguments() (string, chan string) { + projectInfo, receiver := c.GetAllCapturedArguments() + return projectInfo[len(projectInfo)-1], receiver[len(receiver)-1] } -func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string, _param2 []func(string) error) { +func (c *MockProjectCommandOutputHandler_Deregister_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]string, len(c.methodInvocations)) @@ -222,47 +241,29 @@ func (c *MockProjectCommandOutputHandler_Receive_OngoingVerification) GetAllCapt for u, param := range params[1] { _param1[u] = param.(chan string) } - _param2 = make([]func(string) error, len(c.methodInvocations)) - for u, param := range params[2] { - _param2[u] = param.(func(string) error) - } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) *MockProjectCommandOutputHandler_Send_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) Handle() *MockProjectCommandOutputHandler_Handle_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Handle", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Handle_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Send_OngoingVerification struct { +type MockProjectCommandOutputHandler_Handle_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCapturedArguments() { } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(models.ProjectCommandContext) - } - _param1 = make([]string, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(string) - } - } - return +func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { } -func (verifier *VerifierMockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification { - params := []pegomock.Param{_param0, _param1, _param2} +func (verifier *VerifierMockProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification { + params := []pegomock.Param{ctx, cmdName, status} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetJobURLWithStatus", params, verifier.timeout) return &MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -273,8 +274,8 @@ type MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification str } func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, models.CommandName, models.CommitStatus) { - _param0, _param1, _param2 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] + ctx, cmdName, status := c.GetAllCapturedArguments() + return ctx[len(ctx)-1], cmdName[len(cmdName)-1], status[len(status)-1] } func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []models.CommandName, _param2 []models.CommitStatus) { @@ -295,3 +296,30 @@ func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification } return } + +func (verifier *VerifierMockProjectCommandOutputHandler) CleanUp(pull string) *MockProjectCommandOutputHandler_CleanUp_OngoingVerification { + params := []pegomock.Param{pull} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) + return &MockProjectCommandOutputHandler_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockProjectCommandOutputHandler_CleanUp_OngoingVerification struct { + mock *MockProjectCommandOutputHandler + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetCapturedArguments() string { + pull := c.GetAllCapturedArguments() + return pull[len(pull)-1] +} + +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} diff --git a/server/handlers/mocks/mock_resource_cleaner.go b/server/handlers/mocks/mock_resource_cleaner.go new file mode 100644 index 000000000..357faf36f --- /dev/null +++ b/server/handlers/mocks/mock_resource_cleaner.go @@ -0,0 +1,97 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: ResourceCleaner) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + "reflect" + "time" +) + +type MockResourceCleaner struct { + fail func(message string, callerSkip ...int) +} + +func NewMockResourceCleaner(options ...pegomock.Option) *MockResourceCleaner { + mock := &MockResourceCleaner{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockResourceCleaner) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockResourceCleaner) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockResourceCleaner) CleanUp(pull string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockResourceCleaner().") + } + params := []pegomock.Param{pull} + pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) +} + +func (mock *MockResourceCleaner) VerifyWasCalledOnce() *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockResourceCleaner) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockResourceCleaner) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockResourceCleaner) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockResourceCleaner { + return &VerifierMockResourceCleaner{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockResourceCleaner struct { + mock *MockResourceCleaner + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockResourceCleaner) CleanUp(pull string) *MockResourceCleaner_CleanUp_OngoingVerification { + params := []pegomock.Param{pull} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) + return &MockResourceCleaner_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockResourceCleaner_CleanUp_OngoingVerification struct { + mock *MockResourceCleaner + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetCapturedArguments() string { + pull := c.GetAllCapturedArguments() + return pull[len(pull)-1] +} + +func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} diff --git a/server/handlers/mocks/mock_websocket_connection_wrapper.go b/server/handlers/mocks/mock_websocket_connection_wrapper.go deleted file mode 100644 index f9293cc70..000000000 --- a/server/handlers/mocks/mock_websocket_connection_wrapper.go +++ /dev/null @@ -1,183 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketConnectionWrapper) - -package mocks - -import ( - pegomock "github.com/petergtz/pegomock" - "reflect" - "time" -) - -type MockWebsocketConnectionWrapper struct { - fail func(message string, callerSkip ...int) -} - -func NewMockWebsocketConnectionWrapper(options ...pegomock.Option) *MockWebsocketConnectionWrapper { - mock := &MockWebsocketConnectionWrapper{} - for _, option := range options { - option.Apply(mock) - } - return mock -} - -func (mock *MockWebsocketConnectionWrapper) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } -func (mock *MockWebsocketConnectionWrapper) FailHandler() pegomock.FailHandler { return mock.fail } - -func (mock *MockWebsocketConnectionWrapper) ReadMessage() (int, []byte, error) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketConnectionWrapper().") - } - params := []pegomock.Param{} - result := pegomock.GetGenericMockFrom(mock).Invoke("ReadMessage", params, []reflect.Type{reflect.TypeOf((*int)(nil)).Elem(), reflect.TypeOf((*[]byte)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 int - var ret1 []byte - var ret2 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(int) - } - if result[1] != nil { - ret1 = result[1].([]byte) - } - if result[2] != nil { - ret2 = result[2].(error) - } - } - return ret0, ret1, ret2 -} - -func (mock *MockWebsocketConnectionWrapper) SetCloseHandler(_param0 func(int, string) error) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketConnectionWrapper().") - } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("SetCloseHandler", params, []reflect.Type{}) -} - -func (mock *MockWebsocketConnectionWrapper) WriteMessage(_param0 int, _param1 []byte) error { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketConnectionWrapper().") - } - params := []pegomock.Param{_param0, _param1} - result := pegomock.GetGenericMockFrom(mock).Invoke("WriteMessage", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalledOnce() *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: pegomock.Times(1), - } -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - } -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - inOrderContext: inOrderContext, - } -} - -func (mock *MockWebsocketConnectionWrapper) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockWebsocketConnectionWrapper { - return &VerifierMockWebsocketConnectionWrapper{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - timeout: timeout, - } -} - -type VerifierMockWebsocketConnectionWrapper struct { - mock *MockWebsocketConnectionWrapper - invocationCountMatcher pegomock.InvocationCountMatcher - inOrderContext *pegomock.InOrderContext - timeout time.Duration -} - -func (verifier *VerifierMockWebsocketConnectionWrapper) ReadMessage() *MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification { - params := []pegomock.Param{} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "ReadMessage", params, verifier.timeout) - return &MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification struct { - mock *MockWebsocketConnectionWrapper - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification) GetCapturedArguments() { -} - -func (c *MockWebsocketConnectionWrapper_ReadMessage_OngoingVerification) GetAllCapturedArguments() { -} - -func (verifier *VerifierMockWebsocketConnectionWrapper) SetCloseHandler(_param0 func(int, string) error) *MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetCloseHandler", params, verifier.timeout) - return &MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification struct { - mock *MockWebsocketConnectionWrapper - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification) GetCapturedArguments() func(int, string) error { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] -} - -func (c *MockWebsocketConnectionWrapper_SetCloseHandler_OngoingVerification) GetAllCapturedArguments() (_param0 []func(int, string) error) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]func(int, string) error, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(func(int, string) error) - } - } - return -} - -func (verifier *VerifierMockWebsocketConnectionWrapper) WriteMessage(_param0 int, _param1 []byte) *MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "WriteMessage", params, verifier.timeout) - return &MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification struct { - mock *MockWebsocketConnectionWrapper - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification) GetCapturedArguments() (int, []byte) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] -} - -func (c *MockWebsocketConnectionWrapper_WriteMessage_OngoingVerification) GetAllCapturedArguments() (_param0 []int, _param1 [][]byte) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]int, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(int) - } - _param1 = make([][]byte, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.([]byte) - } - } - return -} diff --git a/server/handlers/mocks/mock_websocket_handler.go b/server/handlers/mocks/mock_websocket_handler.go deleted file mode 100644 index 7c7f2e592..000000000 --- a/server/handlers/mocks/mock_websocket_handler.go +++ /dev/null @@ -1,192 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketHandler) - -package mocks - -import ( - pegomock "github.com/petergtz/pegomock" - handlers "github.com/runatlantis/atlantis/server/handlers" - http "net/http" - "reflect" - "time" -) - -type MockWebsocketHandler struct { - fail func(message string, callerSkip ...int) -} - -func NewMockWebsocketHandler(options ...pegomock.Option) *MockWebsocketHandler { - mock := &MockWebsocketHandler{} - for _, option := range options { - option.Apply(mock) - } - return mock -} - -func (mock *MockWebsocketHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } -func (mock *MockWebsocketHandler) FailHandler() pegomock.FailHandler { return mock.fail } - -func (mock *MockWebsocketHandler) SetCloseHandler(_param0 handlers.WebsocketConnectionWrapper, _param1 chan string) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketHandler().") - } - params := []pegomock.Param{_param0, _param1} - pegomock.GetGenericMockFrom(mock).Invoke("SetCloseHandler", params, []reflect.Type{}) -} - -func (mock *MockWebsocketHandler) SetReadHandler(_param0 handlers.WebsocketConnectionWrapper) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketHandler().") - } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("SetReadHandler", params, []reflect.Type{}) -} - -func (mock *MockWebsocketHandler) Upgrade(_param0 http.ResponseWriter, _param1 *http.Request, _param2 http.Header) (handlers.WebsocketConnectionWrapper, error) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketHandler().") - } - params := []pegomock.Param{_param0, _param1, _param2} - result := pegomock.GetGenericMockFrom(mock).Invoke("Upgrade", params, []reflect.Type{reflect.TypeOf((*handlers.WebsocketConnectionWrapper)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 handlers.WebsocketConnectionWrapper - var ret1 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(handlers.WebsocketConnectionWrapper) - } - if result[1] != nil { - ret1 = result[1].(error) - } - } - return ret0, ret1 -} - -func (mock *MockWebsocketHandler) VerifyWasCalledOnce() *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: pegomock.Times(1), - } -} - -func (mock *MockWebsocketHandler) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - } -} - -func (mock *MockWebsocketHandler) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - inOrderContext: inOrderContext, - } -} - -func (mock *MockWebsocketHandler) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockWebsocketHandler { - return &VerifierMockWebsocketHandler{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - timeout: timeout, - } -} - -type VerifierMockWebsocketHandler struct { - mock *MockWebsocketHandler - invocationCountMatcher pegomock.InvocationCountMatcher - inOrderContext *pegomock.InOrderContext - timeout time.Duration -} - -func (verifier *VerifierMockWebsocketHandler) SetCloseHandler(_param0 handlers.WebsocketConnectionWrapper, _param1 chan string) *MockWebsocketHandler_SetCloseHandler_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetCloseHandler", params, verifier.timeout) - return &MockWebsocketHandler_SetCloseHandler_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketHandler_SetCloseHandler_OngoingVerification struct { - mock *MockWebsocketHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketHandler_SetCloseHandler_OngoingVerification) GetCapturedArguments() (handlers.WebsocketConnectionWrapper, chan string) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] -} - -func (c *MockWebsocketHandler_SetCloseHandler_OngoingVerification) GetAllCapturedArguments() (_param0 []handlers.WebsocketConnectionWrapper, _param1 []chan string) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]handlers.WebsocketConnectionWrapper, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(handlers.WebsocketConnectionWrapper) - } - _param1 = make([]chan string, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(chan string) - } - } - return -} - -func (verifier *VerifierMockWebsocketHandler) SetReadHandler(_param0 handlers.WebsocketConnectionWrapper) *MockWebsocketHandler_SetReadHandler_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetReadHandler", params, verifier.timeout) - return &MockWebsocketHandler_SetReadHandler_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketHandler_SetReadHandler_OngoingVerification struct { - mock *MockWebsocketHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketHandler_SetReadHandler_OngoingVerification) GetCapturedArguments() handlers.WebsocketConnectionWrapper { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] -} - -func (c *MockWebsocketHandler_SetReadHandler_OngoingVerification) GetAllCapturedArguments() (_param0 []handlers.WebsocketConnectionWrapper) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]handlers.WebsocketConnectionWrapper, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(handlers.WebsocketConnectionWrapper) - } - } - return -} - -func (verifier *VerifierMockWebsocketHandler) Upgrade(_param0 http.ResponseWriter, _param1 *http.Request, _param2 http.Header) *MockWebsocketHandler_Upgrade_OngoingVerification { - params := []pegomock.Param{_param0, _param1, _param2} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Upgrade", params, verifier.timeout) - return &MockWebsocketHandler_Upgrade_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketHandler_Upgrade_OngoingVerification struct { - mock *MockWebsocketHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketHandler_Upgrade_OngoingVerification) GetCapturedArguments() (http.ResponseWriter, *http.Request, http.Header) { - _param0, _param1, _param2 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] -} - -func (c *MockWebsocketHandler_Upgrade_OngoingVerification) GetAllCapturedArguments() (_param0 []http.ResponseWriter, _param1 []*http.Request, _param2 []http.Header) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]http.ResponseWriter, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(http.ResponseWriter) - } - _param1 = make([]*http.Request, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(*http.Request) - } - _param2 = make([]http.Header, len(c.methodInvocations)) - for u, param := range params[2] { - _param2[u] = param.(http.Header) - } - } - return -} diff --git a/server/handlers/mocks/mock_websocket_response_writer.go b/server/handlers/mocks/mock_websocket_response_writer.go deleted file mode 100644 index 819c8fe81..000000000 --- a/server/handlers/mocks/mock_websocket_response_writer.go +++ /dev/null @@ -1,140 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: WebsocketResponseWriter) - -package mocks - -import ( - pegomock "github.com/petergtz/pegomock" - "reflect" - "time" -) - -type MockWebsocketResponseWriter struct { - fail func(message string, callerSkip ...int) -} - -func NewMockWebsocketResponseWriter(options ...pegomock.Option) *MockWebsocketResponseWriter { - mock := &MockWebsocketResponseWriter{} - for _, option := range options { - option.Apply(mock) - } - return mock -} - -func (mock *MockWebsocketResponseWriter) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } -func (mock *MockWebsocketResponseWriter) FailHandler() pegomock.FailHandler { return mock.fail } - -func (mock *MockWebsocketResponseWriter) WriteMessage(messageType int, data []byte) error { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketResponseWriter().") - } - params := []pegomock.Param{messageType, data} - result := pegomock.GetGenericMockFrom(mock).Invoke("WriteMessage", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 -} - -func (mock *MockWebsocketResponseWriter) Close() error { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockWebsocketResponseWriter().") - } - params := []pegomock.Param{} - result := pegomock.GetGenericMockFrom(mock).Invoke("Close", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalledOnce() *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: pegomock.Times(1), - } -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - } -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - inOrderContext: inOrderContext, - } -} - -func (mock *MockWebsocketResponseWriter) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockWebsocketResponseWriter { - return &VerifierMockWebsocketResponseWriter{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - timeout: timeout, - } -} - -type VerifierMockWebsocketResponseWriter struct { - mock *MockWebsocketResponseWriter - invocationCountMatcher pegomock.InvocationCountMatcher - inOrderContext *pegomock.InOrderContext - timeout time.Duration -} - -func (verifier *VerifierMockWebsocketResponseWriter) WriteMessage(messageType int, data []byte) *MockWebsocketResponseWriter_WriteMessage_OngoingVerification { - params := []pegomock.Param{messageType, data} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "WriteMessage", params, verifier.timeout) - return &MockWebsocketResponseWriter_WriteMessage_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketResponseWriter_WriteMessage_OngoingVerification struct { - mock *MockWebsocketResponseWriter - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketResponseWriter_WriteMessage_OngoingVerification) GetCapturedArguments() (int, []byte) { - messageType, data := c.GetAllCapturedArguments() - return messageType[len(messageType)-1], data[len(data)-1] -} - -func (c *MockWebsocketResponseWriter_WriteMessage_OngoingVerification) GetAllCapturedArguments() (_param0 []int, _param1 [][]byte) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]int, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(int) - } - _param1 = make([][]byte, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.([]byte) - } - } - return -} - -func (verifier *VerifierMockWebsocketResponseWriter) Close() *MockWebsocketResponseWriter_Close_OngoingVerification { - params := []pegomock.Param{} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Close", params, verifier.timeout) - return &MockWebsocketResponseWriter_Close_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockWebsocketResponseWriter_Close_OngoingVerification struct { - mock *MockWebsocketResponseWriter - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockWebsocketResponseWriter_Close_OngoingVerification) GetCapturedArguments() { -} - -func (c *MockWebsocketResponseWriter_Close_OngoingVerification) GetAllCapturedArguments() { -} From 0278d6a0ef13333e7a91db9b3f362d68e65bd0f4 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Fri, 12 Nov 2021 09:32:52 -0800 Subject: [PATCH 03/10] Fix up tests and remove unnecessray mocks. --- server/controllers/jobs_controller.go | 22 ++- server/controllers/jobs_controller_test.go | 47 ------ server/controllers/websocket/writer.go | 2 +- .../handlers_websocketconnectionwrapper.go | 33 ----- .../project_command_output_handler.go | 10 +- .../project_command_output_handler_test.go | 136 +++++++----------- 6 files changed, 69 insertions(+), 181 deletions(-) delete mode 100644 server/controllers/jobs_controller_test.go delete mode 100644 server/handlers/mocks/matchers/handlers_websocketconnectionwrapper.go diff --git a/server/controllers/jobs_controller.go b/server/controllers/jobs_controller.go index 8c546d981..9134620a4 100644 --- a/server/controllers/jobs_controller.go +++ b/server/controllers/jobs_controller.go @@ -15,22 +15,18 @@ import ( "github.com/runatlantis/atlantis/server/core/db" "github.com/runatlantis/atlantis/server/events/metrics" "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" ) type JobsController struct { - AtlantisVersion string - AtlantisURL *url.URL - Logger logging.SimpleLogging - ProjectJobsTemplate templates.TemplateWriter - ProjectJobsErrorTemplate templates.TemplateWriter - Db *db.BoltDB - WsMux *websocket.Multiplexor - - websocketWriter *websocket.Writer - ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler - StatsScope stats.Scope + AtlantisVersion string + AtlantisURL *url.URL + Logger logging.SimpleLogging + ProjectJobsTemplate templates.TemplateWriter + ProjectJobsErrorTemplate templates.TemplateWriter + Db *db.BoltDB + WsMux *websocket.Multiplexor + StatsScope stats.Scope } type ProjectInfoKeyGenerator struct{} @@ -146,7 +142,7 @@ func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request) } func (j *JobsController) getProjectJobsWS(w http.ResponseWriter, r *http.Request) error { - err := j.wsMux.Handle(w, r) + err := j.WsMux.Handle(w, r) if err != nil { j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) diff --git a/server/controllers/jobs_controller_test.go b/server/controllers/jobs_controller_test.go deleted file mode 100644 index 9be9af888..000000000 --- a/server/controllers/jobs_controller_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package controllers_test - -import ( - "net/http" - "net/http/httptest" - "testing" - - "github.com/gorilla/mux" - stats "github.com/lyft/gostats" - "github.com/runatlantis/atlantis/server/controllers" - "github.com/runatlantis/atlantis/server/logging" - - . "github.com/petergtz/pegomock" - "github.com/runatlantis/atlantis/server/handlers/mocks" - "github.com/runatlantis/atlantis/server/handlers/mocks/matchers" -) - -func TestGetProjectJobs_WebSockets(t *testing.T) { - t.Run("Should Group by Project Info", func(t *testing.T) { - RegisterMockTestingT(t) - websocketMock := mocks.NewMockWebsocketHandler() - projectOutputHandler := mocks.NewMockProjectCommandOutputHandler() - logger := logging.NewNoopLogger(t) - webSocketWrapper := mocks.NewMockWebsocketConnectionWrapper() - params := map[string]string{ - "org": "test-org", - "repo": "test-repo", - "pull": "1", - "project": "test-project", - } - request, _ := http.NewRequest(http.MethodGet, "/logStreaming/org/repo/1/project/ws", nil) - request = mux.SetURLVars(request, params) - response := httptest.NewRecorder() - jobsController := &controllers.JobsController{ - Logger: logger, - WebsocketHandler: websocketMock, - ProjectCommandOutputHandler: projectOutputHandler, - StatsScope: stats.NewDefaultStore(), - } - - When(websocketMock.Upgrade(matchers.AnyHttpResponseWriter(), matchers.AnyPtrToHttpRequest(), matchers.AnyHttpHeader())).ThenReturn(webSocketWrapper, nil) - - jobsController.GetProjectJobsWS(response, request) - - webSocketWrapper.VerifyWasCalled(Once()) - }) -} diff --git a/server/controllers/websocket/writer.go b/server/controllers/websocket/writer.go index 7c266d474..b408cf5d9 100644 --- a/server/controllers/websocket/writer.go +++ b/server/controllers/websocket/writer.go @@ -21,7 +21,7 @@ type Writer struct { upgrader websocket.Upgrader //TODO: Remove dependency on atlantis logger here if we upstream this. - log logging.SimpleLogging + log logging.SimpleLogging } func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan string) error { diff --git a/server/handlers/mocks/matchers/handlers_websocketconnectionwrapper.go b/server/handlers/mocks/matchers/handlers_websocketconnectionwrapper.go deleted file mode 100644 index 4d2124911..000000000 --- a/server/handlers/mocks/matchers/handlers_websocketconnectionwrapper.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" - - handlers "github.com/runatlantis/atlantis/server/handlers" -) - -func AnyHandlersWebsocketConnectionWrapper() handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(handlers.WebsocketConnectionWrapper))(nil)).Elem())) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} - -func EqHandlersWebsocketConnectionWrapper(value handlers.WebsocketConnectionWrapper) handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} - -func NotEqHandlersWebsocketConnectionWrapper(value handlers.WebsocketConnectionWrapper) handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} - -func HandlersWebsocketConnectionWrapperThat(matcher pegomock.ArgumentMatcher) handlers.WebsocketConnectionWrapper { - pegomock.RegisterMatcher(matcher) - var nullValue handlers.WebsocketConnectionWrapper - return nullValue -} diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go index ff2ef7c3e..4525c62f9 100644 --- a/server/handlers/project_command_output_handler.go +++ b/server/handlers/project_command_output_handler.go @@ -98,10 +98,9 @@ func (p *AsyncProjectCommandOutputHandler) Register(projectInfo string, receiver p.addChan(receiver, projectInfo) } - - func (p *AsyncProjectCommandOutputHandler) Handle() { for msg := range p.projectCmdOutput { + fmt.Println(fmt.Sprintf("Received: %s", msg.Line)) if msg.ClearBuffBefore { p.clearLogLines(msg.ProjectInfo) } @@ -151,11 +150,15 @@ func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, pull string) //Add log line to buffer and send to all current channels func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string) { + fmt.Println("Capturing receiver lock") p.receiverBuffersLock.Lock() + fmt.Println("Captured receiver lock") for ch := range p.receiverBuffers[pull] { select { case ch <- line: + fmt.Println("Wrote to channel") default: + fmt.Println("Deleting to channel") // Client ws conn could be closed in two ways: // 1. Client closes the conn gracefully -> the closeHandler() is executed which // closes the channel and cleans up resources. @@ -165,6 +168,7 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string delete(p.receiverBuffers[pull], ch) } } + fmt.Println("End of loop") p.receiverBuffersLock.Unlock() // No need to write to projectOutputBuffers if clear msg. @@ -215,7 +219,7 @@ type NoopProjectOutputHandler struct{} func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { } -func (p *NoopProjectOutputHandler) Register(projectInfo string, receiver chan string) {} +func (p *NoopProjectOutputHandler) Register(projectInfo string, receiver chan string) {} func (p *NoopProjectOutputHandler) Deregister(projectInfo string, receiver chan string) {} func (p *NoopProjectOutputHandler) Handle() { diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go index b7c6af914..bb6cc8522 100644 --- a/server/handlers/project_command_output_handler_test.go +++ b/server/handlers/project_command_output_handler_test.go @@ -2,9 +2,9 @@ package handlers_test import ( "errors" + "fmt" "sync" "testing" - "time" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/handlers" @@ -67,139 +67,107 @@ func TestProjectCommandOutputHandler(t *testing.T) { Msg := "Test Terraform Output" ctx := createTestProjectCmdContext(t) - t.Run("Should Receive Message Sent in the ProjectCmdOutput channel", func(t *testing.T) { + t.Run("receive message from main channel", func(t *testing.T) { var wg sync.WaitGroup var expectedMsg string projectOutputHandler := createProjectCommandOutputHandler(t) - wg.Add(1) ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.PullInfo(), ch) + + wg.Add(1) + + // read from channel go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { + for msg := range ch { expectedMsg = msg wg.Done() - return nil - }) - Ok(t, err) + } }() projectOutputHandler.Send(ctx, Msg) wg.Wait() - close(ch) Equals(t, expectedMsg, Msg) }) - t.Run("Should Clear ProjectOutputBuffer when new Plan", func(t *testing.T) { + t.Run("clear buffer", func(t *testing.T) { var wg sync.WaitGroup projectOutputHandler := createProjectCommandOutputHandler(t) - wg.Add(1) - ch := make(chan string) - go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { - wg.Done() - return nil - }) - Ok(t, err) - }() - - projectOutputHandler.Send(ctx, Msg) - wg.Wait() + fmt.Println("hello") - // Send a clear msg - wg.Add(1) - projectOutputHandler.Clear(ctx) - wg.Wait() - - close(ch) - - dfProjectOutputHandler, ok := projectOutputHandler.(*handlers.AsyncProjectCommandOutputHandler) - assert.True(t, ok) - - // Wait for the clear msg to be received by handle() - time.Sleep(1 * time.Second) - assert.Empty(t, dfProjectOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) - }) - - t.Run("Should Cleanup receiverBuffers receiving WS channel closed", func(t *testing.T) { - var wg sync.WaitGroup + ch := make(chan string) - projectOutputHandler := createProjectCommandOutputHandler(t) + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.PullInfo(), ch) wg.Add(1) - ch := make(chan string) + // read from channel asynchronously go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { - wg.Done() - return nil - }) - Ok(t, err) + for msg := range ch { + fmt.Println(msg) + // we are done once we receive the clear message. + // prior message doesn't matter for this test. + if msg == models.LogStreamingClearMsg { + wg.Done() + } + } }() + // send regular message followed by clear message projectOutputHandler.Send(ctx, Msg) - - // Wait for the msg to be read. + projectOutputHandler.Clear(ctx) wg.Wait() - - // Close chan to execute cleanup. close(ch) - time.Sleep(1 * time.Second) dfProjectOutputHandler, ok := projectOutputHandler.(*handlers.AsyncProjectCommandOutputHandler) assert.True(t, ok) - x := dfProjectOutputHandler.GetReceiverBufferForPull(ctx.PullInfo()) - assert.Empty(t, x) + assert.Empty(t, dfProjectOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) }) - t.Run("Should copy over existing log messages to new WS channels", func(t *testing.T) { + t.Run("copies buffer to new channels", func(t *testing.T) { var wg sync.WaitGroup projectOutputHandler := createProjectCommandOutputHandler(t) - wg.Add(1) - ch := make(chan string) - go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { - wg.Done() - return nil - }) - Ok(t, err) - }() - + // send first message to populated the buffer projectOutputHandler.Send(ctx, Msg) - // Wait for the msg to be read. - wg.Wait() - - // Close channel to close prev connection. - // This should close the first go routine with receive call. - close(ch) - - ch = make(chan string) - - // Expecting two calls to callback. - wg.Add(2) + ch := make(chan string) receivedMsgs := []string{} + + wg.Add(1) + // read from channel asynchronously go func() { - err := projectOutputHandler.Receive(ctx.PullInfo(), ch, func(msg string) error { + for msg := range ch { receivedMsgs = append(receivedMsgs, msg) - wg.Done() - return nil - }) - Ok(t, err) - }() - // Make sure addChan gets the buffer lock and adds ch to the map. - time.Sleep(1 * time.Second) + // we're only expecting two messages here. + if len(receivedMsgs) >= 2 { + wg.Done() + } + } + }() + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.PullInfo(), ch) projectOutputHandler.Send(ctx, Msg) - - // Wait for the message to be read. wg.Wait() close(ch) From 0f9b435a77832f4531b8008d758eb671f453d929 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Fri, 12 Nov 2021 09:35:30 -0800 Subject: [PATCH 04/10] Remove all the printlns. --- server/handlers/project_command_output_handler.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go index 4525c62f9..c450ab8b9 100644 --- a/server/handlers/project_command_output_handler.go +++ b/server/handlers/project_command_output_handler.go @@ -1,7 +1,6 @@ package handlers import ( - "fmt" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/logging" "sync" @@ -100,7 +99,6 @@ func (p *AsyncProjectCommandOutputHandler) Register(projectInfo string, receiver func (p *AsyncProjectCommandOutputHandler) Handle() { for msg := range p.projectCmdOutput { - fmt.Println(fmt.Sprintf("Received: %s", msg.Line)) if msg.ClearBuffBefore { p.clearLogLines(msg.ProjectInfo) } @@ -150,15 +148,11 @@ func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, pull string) //Add log line to buffer and send to all current channels func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string) { - fmt.Println("Capturing receiver lock") p.receiverBuffersLock.Lock() - fmt.Println("Captured receiver lock") for ch := range p.receiverBuffers[pull] { select { case ch <- line: - fmt.Println("Wrote to channel") default: - fmt.Println("Deleting to channel") // Client ws conn could be closed in two ways: // 1. Client closes the conn gracefully -> the closeHandler() is executed which // closes the channel and cleans up resources. @@ -168,7 +162,6 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string delete(p.receiverBuffers[pull], ch) } } - fmt.Println("End of loop") p.receiverBuffersLock.Unlock() // No need to write to projectOutputBuffers if clear msg. @@ -186,7 +179,7 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string //Remove channel, so client no longer receives Terraform output func (p *AsyncProjectCommandOutputHandler) Deregister(pull string, ch chan string) { - p.logger.Debug(fmt.Sprintf("Removing channel for %s", pull)) + p.logger.Debug("Removing channel for %s", pull) p.receiverBuffersLock.Lock() delete(p.receiverBuffers[pull], ch) p.receiverBuffersLock.Unlock() From 60e3d5947a37203e77b97845fff363a9cf4a4d9e Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Fri, 12 Nov 2021 10:33:22 -0800 Subject: [PATCH 05/10] Fix race condition for adding new channel. --- .../handlers/project_command_output_handler.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go index c450ab8b9..2504e9649 100644 --- a/server/handlers/project_command_output_handler.go +++ b/server/handlers/project_command_output_handler.go @@ -130,13 +130,6 @@ func (p *AsyncProjectCommandOutputHandler) clearLogLines(pull string) { } func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, pull string) { - p.receiverBuffersLock.Lock() - if p.receiverBuffers[pull] == nil { - p.receiverBuffers[pull] = map[chan string]bool{} - } - p.receiverBuffers[pull][ch] = true - p.receiverBuffersLock.Unlock() - p.projectOutputBuffersLock.RLock() buffer := p.projectOutputBuffers[pull] p.projectOutputBuffersLock.RUnlock() @@ -144,6 +137,15 @@ func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, pull string) for _, line := range buffer { ch <- line } + + // add the channel to our registry after we backfill the contents of the buffer, + // to prevent new messages coming in interleaving with this backfill. + p.receiverBuffersLock.Lock() + if p.receiverBuffers[pull] == nil { + p.receiverBuffers[pull] = map[chan string]bool{} + } + p.receiverBuffers[pull][ch] = true + p.receiverBuffersLock.Unlock() } //Add log line to buffer and send to all current channels From 1df15b6e5fe53d7bbaacee6f1f55d0489c6aa952 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Tue, 16 Nov 2021 09:04:20 -0800 Subject: [PATCH 06/10] Update server/controllers/websocket/writer.go Co-authored-by: Aayush Gupta <43479002+Aayyush@users.noreply.github.com> --- server/controllers/websocket/writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/controllers/websocket/writer.go b/server/controllers/websocket/writer.go index b408cf5d9..eca3f6ae6 100644 --- a/server/controllers/websocket/writer.go +++ b/server/controllers/websocket/writer.go @@ -33,7 +33,7 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin conn.SetCloseHandler(func(code int, text string) error { // Close the channnel after websocket connection closed. - // Will gracefully exit the ProjectCommandOutputHandler.Receive() call and cleanup. + // Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup. // is it good practice to close at the receiver? Probably not, we should figure out a better // way to handle this case close(input) From a4a8ed5f692be44e4a44a73d33ed6dd0360dab61 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Tue, 16 Nov 2021 21:38:36 -0800 Subject: [PATCH 07/10] Update server/handlers/project_command_output_handler_test.go Co-authored-by: Aayush Gupta <43479002+Aayyush@users.noreply.github.com> --- server/handlers/project_command_output_handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go index bb6cc8522..cc3c40486 100644 --- a/server/handlers/project_command_output_handler_test.go +++ b/server/handlers/project_command_output_handler_test.go @@ -102,7 +102,7 @@ func TestProjectCommandOutputHandler(t *testing.T) { projectOutputHandler := createProjectCommandOutputHandler(t) - fmt.Println("hello") + ch := make(chan string) From a0f6bca833d587f48113de4f37867072cc3b4917 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Tue, 16 Nov 2021 21:55:59 -0800 Subject: [PATCH 08/10] Add missing registry field. --- server/controllers/websocket/mux.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/controllers/websocket/mux.go b/server/controllers/websocket/mux.go index 6a5b407bb..ccfbdf99f 100644 --- a/server/controllers/websocket/mux.go +++ b/server/controllers/websocket/mux.go @@ -38,6 +38,7 @@ func NewMultiplexor(log logging.SimpleLogging, keyGenerator PartitionKeyGenerato log: log, }, keyGenerator: keyGenerator, + registry: registry, } } From a4fb550dc9fbc2e768f4804f4adc97788d817d38 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Thu, 2 Dec 2021 11:18:20 -0500 Subject: [PATCH 09/10] Update server/handlers/project_command_output_handler_test.go Co-authored-by: Aayush Gupta <43479002+Aayyush@users.noreply.github.com> --- server/handlers/project_command_output_handler_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go index cc3c40486..cf6466e69 100644 --- a/server/handlers/project_command_output_handler_test.go +++ b/server/handlers/project_command_output_handler_test.go @@ -116,7 +116,6 @@ func TestProjectCommandOutputHandler(t *testing.T) { // read from channel asynchronously go func() { for msg := range ch { - fmt.Println(msg) // we are done once we receive the clear message. // prior message doesn't matter for this test. if msg == models.LogStreamingClearMsg { From b13d4cf2c21890cb59ce9f7db409ddf91c572a88 Mon Sep 17 00:00:00 2001 From: Nish Krishnan Date: Thu, 2 Dec 2021 11:35:51 -0500 Subject: [PATCH 10/10] remove fmt. --- server/handlers/project_command_output_handler_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go index cf6466e69..a37a2c90b 100644 --- a/server/handlers/project_command_output_handler_test.go +++ b/server/handlers/project_command_output_handler_test.go @@ -2,7 +2,6 @@ package handlers_test import ( "errors" - "fmt" "sync" "testing"