From 58e9b42bbc21bd6d2706b8c8b9c6cd6ad699c3c8 Mon Sep 17 00:00:00 2001 From: Aayush Gupta <43479002+Aayyush@users.noreply.github.com> Date: Wed, 9 Feb 2022 13:15:49 -0800 Subject: [PATCH] feat: Use UUIDs to identify log streaming jobs (#2051) * Add UUID for Log Streaming Job ID (#167) * Update log handler to close buffered channels when an operation is complete (#170) * Add preliminary check before registering new receivers in the log handler (#173) * Using projectOutputBuffers to check for jobID instead of receiverBuffers (#181) * Refactor log handler (#175) * Reverting go.mod and go.sum * Fix lint errors * Fix linting --- .../events/events_controller_e2e_test.go | 4 +- server/controllers/jobs_controller.go | 110 ++------ server/controllers/templates/web_templates.go | 11 +- server/controllers/websocket/mux.go | 7 + server/controllers/websocket/writer.go | 31 +- server/core/terraform/terraform_client.go | 14 +- .../terraform_client_internal_test.go | 16 +- .../core/terraform/terraform_client_test.go | 20 +- server/events/mocks/matchers/jobs_pullinfo.go | 33 +++ server/events/mocks/mock_job_id_generator.go | 94 +++++++ .../events/mocks/mock_job_message_sender.go | 106 +++++++ server/events/mocks/mock_job_url_setter.go | 113 ++++++++ .../mocks/mock_resource_cleaner.go | 15 +- server/events/models/models.go | 11 +- server/events/project_command_builder.go | 2 +- .../project_command_builder_internal_test.go | 10 + .../events/project_command_context_builder.go | 4 +- server/events/project_command_runner.go | 37 ++- server/events/project_command_runner_test.go | 14 +- server/events/pull_closed_executor.go | 24 +- server/events/pull_closed_executor_test.go | 13 +- server/handlers/mocks/matchers/http_header.go | 33 --- .../mocks/matchers/http_responsewriter.go | 33 --- .../matchers/map_of_chan_of_string_to_bool.go | 31 -- .../mocks/matchers/ptr_to_http_request.go | 33 --- .../handlers/mocks/matchers/slice_of_byte.go | 31 -- .../mocks/matchers/slice_of_string.go | 31 -- .../project_command_output_handler.go | 232 --------------- .../project_command_output_handler_test.go | 216 -------------- server/handlers/websocket_handler.go | 61 ---- server/jobs/job_url_setter.go | 39 +++ server/jobs/job_url_setter_test.go | 44 +++ .../mocks/matchers/chan_of_string.go | 0 server/jobs/mocks/matchers/jobs_pullinfo.go | 33 +++ .../mocks/matchers/models_commandname.go | 0 .../mocks/matchers/models_commitstatus.go | 0 .../matchers/models_projectcommandcontext.go | 0 .../mock_project_command_output_handler.go | 140 ++++----- .../mocks/mock_project_job_url_generator.go | 14 +- .../mocks/mock_project_status_updater.go | 14 +- server/jobs/project_command_output_handler.go | 266 ++++++++++++++++++ .../project_command_output_handler_test.go | 250 ++++++++++++++++ server/router.go | 16 +- server/router_test.go | 92 +----- server/server.go | 26 +- 45 files changed, 1213 insertions(+), 1111 deletions(-) create mode 100644 server/events/mocks/matchers/jobs_pullinfo.go create mode 100644 server/events/mocks/mock_job_id_generator.go create mode 100644 server/events/mocks/mock_job_message_sender.go create mode 100644 server/events/mocks/mock_job_url_setter.go rename server/{handlers => events}/mocks/mock_resource_cleaner.go (85%) delete mode 100644 server/handlers/mocks/matchers/http_header.go delete mode 100644 server/handlers/mocks/matchers/http_responsewriter.go delete mode 100644 server/handlers/mocks/matchers/map_of_chan_of_string_to_bool.go delete mode 100644 server/handlers/mocks/matchers/ptr_to_http_request.go delete mode 100644 server/handlers/mocks/matchers/slice_of_byte.go delete mode 100644 server/handlers/mocks/matchers/slice_of_string.go delete mode 100644 server/handlers/project_command_output_handler.go delete mode 100644 server/handlers/project_command_output_handler_test.go delete mode 100644 server/handlers/websocket_handler.go create mode 100644 server/jobs/job_url_setter.go create mode 100644 server/jobs/job_url_setter_test.go rename server/{handlers => jobs}/mocks/matchers/chan_of_string.go (100%) create mode 100644 server/jobs/mocks/matchers/jobs_pullinfo.go rename server/{handlers => jobs}/mocks/matchers/models_commandname.go (100%) rename server/{handlers => jobs}/mocks/matchers/models_commitstatus.go (100%) rename server/{handlers => jobs}/mocks/matchers/models_projectcommandcontext.go (100%) rename server/{handlers => jobs}/mocks/mock_project_command_output_handler.go (73%) rename server/{handlers => jobs}/mocks/mock_project_job_url_generator.go (88%) rename server/{handlers => jobs}/mocks/mock_project_status_updater.go (84%) create mode 100644 server/jobs/project_command_output_handler.go create mode 100644 server/jobs/project_command_output_handler_test.go diff --git a/server/controllers/events/events_controller_e2e_test.go b/server/controllers/events/events_controller_e2e_test.go index bbc8a99a45..34401dac6a 100644 --- a/server/controllers/events/events_controller_e2e_test.go +++ b/server/controllers/events/events_controller_e2e_test.go @@ -34,7 +34,7 @@ import ( "github.com/runatlantis/atlantis/server/events/vcs" vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks" "github.com/runatlantis/atlantis/server/events/webhooks" - handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -836,7 +836,7 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl e2eStatusUpdater := &events.DefaultCommitStatusUpdater{Client: e2eVCSClient} e2eGithubGetter := mocks.NewMockGithubPullGetter() e2eGitlabGetter := mocks.NewMockGitlabMergeRequestGetter() - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() // Real dependencies. logger := logging.NewNoopLogger(t) diff --git a/server/controllers/jobs_controller.go b/server/controllers/jobs_controller.go index 872176c66d..9c512b20ec 100644 --- a/server/controllers/jobs_controller.go +++ b/server/controllers/jobs_controller.go @@ -5,17 +5,24 @@ import ( "net/http" "net/url" - "strconv" - "github.com/gorilla/mux" - "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/controllers/templates" "github.com/runatlantis/atlantis/server/controllers/websocket" "github.com/runatlantis/atlantis/server/core/db" - "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/logging" ) +type JobIDKeyGenerator struct{} + +func (g JobIDKeyGenerator) Generate(r *http.Request) (string, error) { + jobID, ok := mux.Vars(r)["job-id"] + if !ok { + return "", fmt.Errorf("internal error: no job-id in route") + } + + return jobID, nil +} + type JobsController struct { AtlantisVersion string AtlantisURL *url.URL @@ -24,105 +31,24 @@ type JobsController struct { ProjectJobsErrorTemplate templates.TemplateWriter Db *db.BoltDB WsMux *websocket.Multiplexor -} - -type ProjectInfoKeyGenerator struct{} - -func (g ProjectInfoKeyGenerator) Generate(r *http.Request) (string, error) { - projectInfo, err := newProjectInfo(r) - - if err != nil { - return "", errors.Wrap(err, "creating project info") - } - - return projectInfo.String(), nil -} - -type pullInfo struct { - org string - repo string - pull int -} - -func (p *pullInfo) String() string { - return fmt.Sprintf("%s/%s/%d", p.org, p.repo, p.pull) -} - -type projectInfo struct { - projectName string - workspace string - pullInfo -} - -func (p *projectInfo) String() string { - return fmt.Sprintf("%s/%s/%d/%s/%s", p.org, p.repo, p.pull, p.projectName, p.workspace) -} - -func newPullInfo(r *http.Request) (*pullInfo, error) { - org, ok := mux.Vars(r)["org"] - if !ok { - return nil, fmt.Errorf("Internal error: no org in route") - } - repo, ok := mux.Vars(r)["repo"] - if !ok { - return nil, fmt.Errorf("Internal error: no repo in route") - } - pull, ok := mux.Vars(r)["pull"] - if !ok { - return nil, fmt.Errorf("Internal error: no pull in route") - } - pullNum, err := strconv.Atoi(pull) - if err != nil { - return nil, err - } - - return &pullInfo{ - org: org, - repo: repo, - pull: pullNum, - }, nil -} - -// Gets the PR information from the HTTP request params -func newProjectInfo(r *http.Request) (*projectInfo, error) { - pullInfo, err := newPullInfo(r) - if err != nil { - return nil, err - } - - project, ok := mux.Vars(r)["project"] - if !ok { - return nil, fmt.Errorf("Internal error: no project in route") - } - - workspace, ok := mux.Vars(r)["workspace"] - if !ok { - return nil, fmt.Errorf("Internal error: no workspace in route") - } - - return &projectInfo{ - pullInfo: *pullInfo, - projectName: project, - workspace: workspace, - }, nil + KeyGenerator JobIDKeyGenerator } func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request) { - projectInfo, err := newProjectInfo(r) + jobID, err := j.KeyGenerator.Generate(r) + if err != nil { - j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) + j.respond(w, logging.Error, http.StatusBadRequest, err.Error()) return } viewData := templates.ProjectJobData{ AtlantisVersion: j.AtlantisVersion, - ProjectPath: projectInfo.String(), + ProjectPath: jobID, CleanedBasePath: j.AtlantisURL.Path, - ClearMsg: models.LogStreamingClearMsg, } - err = j.ProjectJobsTemplate.Execute(w, viewData) - if err != nil { + if err = j.ProjectJobsTemplate.Execute(w, viewData); err != nil { j.Logger.Err(err.Error()) } } @@ -131,7 +57,7 @@ func (j *JobsController) GetProjectJobsWS(w http.ResponseWriter, r *http.Request err := j.WsMux.Handle(w, r) if err != nil { - j.respond(w, logging.Error, http.StatusInternalServerError, err.Error()) + j.respond(w, logging.Error, http.StatusBadRequest, err.Error()) return } } diff --git a/server/controllers/templates/web_templates.go b/server/controllers/templates/web_templates.go index 68f9ca8076..defe8f29d9 100644 --- a/server/controllers/templates/web_templates.go +++ b/server/controllers/templates/web_templates.go @@ -357,7 +357,6 @@ type ProjectJobData struct { AtlantisVersion string ProjectPath string CleanedBasePath string - ClearMsg string } var ProjectJobsTemplate = template.Must(template.New("blank.html.tmpl").Parse(` @@ -417,13 +416,9 @@ var ProjectJobsTemplate = template.Must(template.New("blank.html.tmpl").Parse(` document.location.host + document.location.pathname + "/ws"); - socket.onmessage = function(event) { - var msg = String.fromCharCode.apply(null, new Uint8Array(event.data)) - if (msg.trim() === "-----Starting New Process-----") { - term.clear() - return - } - } + window.addEventListener("unload", function(event) { + websocket.close(); + }) var attachAddon = new AttachAddon.AttachAddon(socket); var fitAddon = new FitAddon.FitAddon(); term.loadAddon(attachAddon); diff --git a/server/controllers/websocket/mux.go b/server/controllers/websocket/mux.go index ccfbdf99f9..8288df3212 100644 --- a/server/controllers/websocket/mux.go +++ b/server/controllers/websocket/mux.go @@ -1,6 +1,7 @@ package websocket import ( + "fmt" "net/http" "github.com/gorilla/websocket" @@ -18,6 +19,7 @@ type PartitionKeyGenerator interface { type PartitionRegistry interface { Register(key string, buffer chan string) Deregister(key string, buffer chan string) + IsKeyExists(key string) bool } // Multiplexor is responsible for handling the data transfer between the storage layer @@ -51,6 +53,11 @@ func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error { return errors.Wrapf(err, "generating partition key") } + // check if the job ID exists before registering receiver + if !m.registry.IsKeyExists(key) { + return fmt.Errorf("invalid key: %s", key) + } + // Buffer size set to 1000 to ensure messages get queued. // TODO: make buffer size configurable buffer := make(chan string, 1000) diff --git a/server/controllers/websocket/writer.go b/server/controllers/websocket/writer.go index 1e19e50376..89eb3b6dbe 100644 --- a/server/controllers/websocket/writer.go +++ b/server/controllers/websocket/writer.go @@ -29,18 +29,6 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin return errors.Wrap(err, "upgrading websocket connection") } - conn.SetCloseHandler(func(code int, text string) error { - // Close the channnel after websocket connection closed. - // Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup. - // is it good practice to close at the receiver? Probably not, we should figure out a better - // way to handle this case - close(input) - return nil - }) - - // Add a reader goroutine to listen for socket.close() events. - go w.setReadHandler(conn) - // block on reading our input channel for msg := range input { if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil { @@ -49,20 +37,9 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin } } - return nil -} - -func (w *Writer) setReadHandler(c *websocket.Conn) { - for { - _, _, err := c.ReadMessage() - if err != nil { - // CloseGoingAway (1001) when a browser tab is closed. - // Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - w.log.Warn("Failed to read WS message: %s", err) - } - return - } + // close ws conn after input channel is closed + if err = conn.Close(); err != nil { + w.log.Warn("Failed to close ws connection: %s", err) } - + return nil } diff --git a/server/core/terraform/terraform_client.go b/server/core/terraform/terraform_client.go index caa4a47c49..5e01954569 100644 --- a/server/core/terraform/terraform_client.go +++ b/server/core/terraform/terraform_client.go @@ -33,7 +33,7 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/terraform/ansi" - "github.com/runatlantis/atlantis/server/handlers" + "github.com/runatlantis/atlantis/server/jobs" "github.com/runatlantis/atlantis/server/logging" ) @@ -79,7 +79,7 @@ type DefaultClient struct { // usePluginCache determines whether or not to set the TF_PLUGIN_CACHE_DIR env var usePluginCache bool - projectCmdOutputHandler handlers.ProjectCommandOutputHandler + projectCmdOutputHandler jobs.ProjectCommandOutputHandler } //go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_downloader.go Downloader @@ -111,7 +111,7 @@ func NewClientWithDefaultVersion( tfDownloader Downloader, usePluginCache bool, fetchAsync bool, - projectCmdOutputHandler handlers.ProjectCommandOutputHandler, + projectCmdOutputHandler jobs.ProjectCommandOutputHandler, ) (*DefaultClient, error) { var finalDefaultVersion *version.Version var localVersion *version.Version @@ -194,7 +194,7 @@ func NewTestClient( tfDownloadURL string, tfDownloader Downloader, usePluginCache bool, - projectCmdOutputHandler handlers.ProjectCommandOutputHandler, + projectCmdOutputHandler jobs.ProjectCommandOutputHandler, ) (*DefaultClient, error) { return NewClientWithDefaultVersion( log, @@ -231,7 +231,7 @@ func NewClient( tfDownloadURL string, tfDownloader Downloader, usePluginCache bool, - projectCmdOutputHandler handlers.ProjectCommandOutputHandler, + projectCmdOutputHandler jobs.ProjectCommandOutputHandler, ) (*DefaultClient, error) { return NewClientWithDefaultVersion( log, @@ -437,7 +437,7 @@ func (c *DefaultClient) RunCommandAsync(ctx models.ProjectCommandContext, path s for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message) + c.projectCmdOutputHandler.Send(ctx, message, false) } wg.Done() }() @@ -446,7 +446,7 @@ func (c *DefaultClient) RunCommandAsync(ctx models.ProjectCommandContext, path s for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message) + c.projectCmdOutputHandler.Send(ctx, message, false) } wg.Done() }() diff --git a/server/core/terraform/terraform_client_internal_test.go b/server/core/terraform/terraform_client_internal_test.go index 903724d1f7..d0d30e7592 100644 --- a/server/core/terraform/terraform_client_internal_test.go +++ b/server/core/terraform/terraform_client_internal_test.go @@ -9,7 +9,7 @@ import ( version "github.com/hashicorp/go-version" "github.com/runatlantis/atlantis/server/events/models" - handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -91,7 +91,7 @@ func TestDefaultClient_RunCommandWithVersion_EnvVars(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logger, @@ -138,7 +138,7 @@ func TestDefaultClient_RunCommandWithVersion_Error(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logger, @@ -181,7 +181,7 @@ func TestDefaultClient_RunCommandAsync_Success(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logger, @@ -228,7 +228,7 @@ func TestDefaultClient_RunCommandAsync_BigOutput(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logger, @@ -276,7 +276,7 @@ func TestDefaultClient_RunCommandAsync_StderrOutput(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logger, @@ -313,7 +313,7 @@ func TestDefaultClient_RunCommandAsync_ExitOne(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logger, @@ -351,7 +351,7 @@ func TestDefaultClient_RunCommandAsync_Input(t *testing.T) { Ok(t, err) tmp, cleanup := TempDir(t) logger := logging.NewNoopLogger(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logger, diff --git a/server/core/terraform/terraform_client_test.go b/server/core/terraform/terraform_client_test.go index 420e55eb7a..610b262698 100644 --- a/server/core/terraform/terraform_client_test.go +++ b/server/core/terraform/terraform_client_test.go @@ -28,7 +28,7 @@ import ( "github.com/runatlantis/atlantis/server/core/terraform" "github.com/runatlantis/atlantis/server/core/terraform/mocks" "github.com/runatlantis/atlantis/server/events/models" - handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -61,7 +61,7 @@ Your version of Terraform is out of date! The latest version is 0.11.13. You can update by downloading from www.terraform.io/downloads.html ` tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -98,7 +98,7 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html ` logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -128,7 +128,7 @@ is 0.11.13. You can update by downloading from www.terraform.io/downloads.html func TestNewClient_NoTF(t *testing.T) { logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() defer cleanup() // Set PATH to only include our empty directory. @@ -144,7 +144,7 @@ func TestNewClient_DefaultTFFlagInPath(t *testing.T) { fakeBinOut := "Terraform v0.11.10\n" logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -174,7 +174,7 @@ func TestNewClient_DefaultTFFlagInPath(t *testing.T) { func TestNewClient_DefaultTFFlagInBinDir(t *testing.T) { fakeBinOut := "Terraform v0.11.10\n" tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -203,7 +203,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) { RegisterMockTestingT(t) logger := logging.NewNoopLogger(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -245,7 +245,7 @@ func TestNewClient_DefaultTFFlagDownload(t *testing.T) { func TestNewClient_BadVersion(t *testing.T) { logger := logging.NewNoopLogger(t) _, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() defer cleanup() _, err := terraform.NewClient(logger, binDir, cacheDir, "", "", "malformed", cmd.DefaultTFVersionFlag, cmd.DefaultTFDownloadURL, nil, true, projectCmdOutputHandler) ErrEquals(t, "Malformed version: malformed", err) @@ -256,7 +256,7 @@ func TestRunCommandWithVersion_DLsTF(t *testing.T) { logger := logging.NewNoopLogger(t) RegisterMockTestingT(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() ctx := models.ProjectCommandContext{ Log: logging.NewNoopLogger(t), Workspace: "default", @@ -295,7 +295,7 @@ func TestEnsureVersion_downloaded(t *testing.T) { logger := logging.NewNoopLogger(t) RegisterMockTestingT(t) tmp, binDir, cacheDir, cleanup := mkSubDirs(t) - projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler() defer cleanup() mockDownloader := mocks.NewMockDownloader() diff --git a/server/events/mocks/matchers/jobs_pullinfo.go b/server/events/mocks/matchers/jobs_pullinfo.go new file mode 100644 index 0000000000..95e16a16fa --- /dev/null +++ b/server/events/mocks/matchers/jobs_pullinfo.go @@ -0,0 +1,33 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" + + jobs "github.com/runatlantis/atlantis/server/jobs" +) + +func AnyJobsPullInfo() jobs.PullInfo { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(jobs.PullInfo))(nil)).Elem())) + var nullValue jobs.PullInfo + return nullValue +} + +func EqJobsPullInfo(value jobs.PullInfo) jobs.PullInfo { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue jobs.PullInfo + return nullValue +} + +func NotEqJobsPullInfo(value jobs.PullInfo) jobs.PullInfo { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue jobs.PullInfo + return nullValue +} + +func JobsPullInfoThat(matcher pegomock.ArgumentMatcher) jobs.PullInfo { + pegomock.RegisterMatcher(matcher) + var nullValue jobs.PullInfo + return nullValue +} diff --git a/server/events/mocks/mock_job_id_generator.go b/server/events/mocks/mock_job_id_generator.go new file mode 100644 index 0000000000..001d437f72 --- /dev/null +++ b/server/events/mocks/mock_job_id_generator.go @@ -0,0 +1,94 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/events (interfaces: JobIDGenerator) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + "reflect" + "time" +) + +type MockJobIDGenerator struct { + fail func(message string, callerSkip ...int) +} + +func NewMockJobIDGenerator(options ...pegomock.Option) *MockJobIDGenerator { + mock := &MockJobIDGenerator{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockJobIDGenerator) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockJobIDGenerator) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockJobIDGenerator) GenerateJobID() string { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobIDGenerator().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("GenerateJobID", params, []reflect.Type{reflect.TypeOf((*string)(nil)).Elem()}) + var ret0 string + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(string) + } + } + return ret0 +} + +func (mock *MockJobIDGenerator) VerifyWasCalledOnce() *VerifierMockJobIDGenerator { + return &VerifierMockJobIDGenerator{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockJobIDGenerator) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockJobIDGenerator { + return &VerifierMockJobIDGenerator{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockJobIDGenerator) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockJobIDGenerator { + return &VerifierMockJobIDGenerator{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockJobIDGenerator) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockJobIDGenerator { + return &VerifierMockJobIDGenerator{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockJobIDGenerator struct { + mock *MockJobIDGenerator + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockJobIDGenerator) GenerateJobID() *MockJobIDGenerator_GenerateJobID_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "GenerateJobID", params, verifier.timeout) + return &MockJobIDGenerator_GenerateJobID_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobIDGenerator_GenerateJobID_OngoingVerification struct { + mock *MockJobIDGenerator + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobIDGenerator_GenerateJobID_OngoingVerification) GetCapturedArguments() { +} + +func (c *MockJobIDGenerator_GenerateJobID_OngoingVerification) GetAllCapturedArguments() { +} diff --git a/server/events/mocks/mock_job_message_sender.go b/server/events/mocks/mock_job_message_sender.go new file mode 100644 index 0000000000..e4d0c8fef5 --- /dev/null +++ b/server/events/mocks/mock_job_message_sender.go @@ -0,0 +1,106 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/events (interfaces: JobMessageSender) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + models "github.com/runatlantis/atlantis/server/events/models" + "reflect" + "time" +) + +type MockJobMessageSender struct { + fail func(message string, callerSkip ...int) +} + +func NewMockJobMessageSender(options ...pegomock.Option) *MockJobMessageSender { + mock := &MockJobMessageSender{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockJobMessageSender) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockJobMessageSender) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockJobMessageSender) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobMessageSender().") + } + params := []pegomock.Param{_param0, _param1, _param2} + pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) +} + +func (mock *MockJobMessageSender) VerifyWasCalledOnce() *VerifierMockJobMessageSender { + return &VerifierMockJobMessageSender{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockJobMessageSender) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockJobMessageSender { + return &VerifierMockJobMessageSender{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockJobMessageSender) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockJobMessageSender { + return &VerifierMockJobMessageSender{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockJobMessageSender) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockJobMessageSender { + return &VerifierMockJobMessageSender{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockJobMessageSender struct { + mock *MockJobMessageSender + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockJobMessageSender) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) *MockJobMessageSender_Send_OngoingVerification { + params := []pegomock.Param{_param0, _param1, _param2} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) + return &MockJobMessageSender_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobMessageSender_Send_OngoingVerification struct { + mock *MockJobMessageSender + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobMessageSender_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string, bool) { + _param0, _param1, _param2 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] +} + +func (c *MockJobMessageSender_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string, _param2 []bool) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(models.ProjectCommandContext) + } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } + _param2 = make([]bool, len(c.methodInvocations)) + for u, param := range params[2] { + _param2[u] = param.(bool) + } + } + return +} diff --git a/server/events/mocks/mock_job_url_setter.go b/server/events/mocks/mock_job_url_setter.go new file mode 100644 index 0000000000..b1df846f18 --- /dev/null +++ b/server/events/mocks/mock_job_url_setter.go @@ -0,0 +1,113 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/events (interfaces: JobURLSetter) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + models "github.com/runatlantis/atlantis/server/events/models" + "reflect" + "time" +) + +type MockJobURLSetter struct { + fail func(message string, callerSkip ...int) +} + +func NewMockJobURLSetter(options ...pegomock.Option) *MockJobURLSetter { + mock := &MockJobURLSetter{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockJobURLSetter) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockJobURLSetter) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockJobURLSetter) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) error { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobURLSetter().") + } + params := []pegomock.Param{_param0, _param1, _param2} + result := pegomock.GetGenericMockFrom(mock).Invoke("SetJobURLWithStatus", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(error) + } + } + return ret0 +} + +func (mock *MockJobURLSetter) VerifyWasCalledOnce() *VerifierMockJobURLSetter { + return &VerifierMockJobURLSetter{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockJobURLSetter) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockJobURLSetter { + return &VerifierMockJobURLSetter{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockJobURLSetter) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockJobURLSetter { + return &VerifierMockJobURLSetter{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockJobURLSetter) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockJobURLSetter { + return &VerifierMockJobURLSetter{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockJobURLSetter struct { + mock *MockJobURLSetter + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockJobURLSetter) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) *MockJobURLSetter_SetJobURLWithStatus_OngoingVerification { + params := []pegomock.Param{_param0, _param1, _param2} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetJobURLWithStatus", params, verifier.timeout) + return &MockJobURLSetter_SetJobURLWithStatus_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobURLSetter_SetJobURLWithStatus_OngoingVerification struct { + mock *MockJobURLSetter + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobURLSetter_SetJobURLWithStatus_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, models.CommandName, models.CommitStatus) { + _param0, _param1, _param2 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] +} + +func (c *MockJobURLSetter_SetJobURLWithStatus_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []models.CommandName, _param2 []models.CommitStatus) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(models.ProjectCommandContext) + } + _param1 = make([]models.CommandName, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(models.CommandName) + } + _param2 = make([]models.CommitStatus, len(c.methodInvocations)) + for u, param := range params[2] { + _param2[u] = param.(models.CommitStatus) + } + } + return +} diff --git a/server/handlers/mocks/mock_resource_cleaner.go b/server/events/mocks/mock_resource_cleaner.go similarity index 85% rename from server/handlers/mocks/mock_resource_cleaner.go rename to server/events/mocks/mock_resource_cleaner.go index 430dd2709f..bd4d5d1294 100644 --- a/server/handlers/mocks/mock_resource_cleaner.go +++ b/server/events/mocks/mock_resource_cleaner.go @@ -1,10 +1,11 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: ResourceCleaner) +// Source: github.com/runatlantis/atlantis/server/events (interfaces: ResourceCleaner) package mocks import ( pegomock "github.com/petergtz/pegomock" + jobs "github.com/runatlantis/atlantis/server/jobs" "reflect" "time" ) @@ -24,7 +25,7 @@ func NewMockResourceCleaner(options ...pegomock.Option) *MockResourceCleaner { func (mock *MockResourceCleaner) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockResourceCleaner) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockResourceCleaner) CleanUp(_param0 string) { +func (mock *MockResourceCleaner) CleanUp(_param0 jobs.PullInfo) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockResourceCleaner().") } @@ -69,7 +70,7 @@ type VerifierMockResourceCleaner struct { timeout time.Duration } -func (verifier *VerifierMockResourceCleaner) CleanUp(_param0 string) *MockResourceCleaner_CleanUp_OngoingVerification { +func (verifier *VerifierMockResourceCleaner) CleanUp(_param0 jobs.PullInfo) *MockResourceCleaner_CleanUp_OngoingVerification { params := []pegomock.Param{_param0} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) return &MockResourceCleaner_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} @@ -80,17 +81,17 @@ type MockResourceCleaner_CleanUp_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetCapturedArguments() string { +func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetCapturedArguments() jobs.PullInfo { _param0 := c.GetAllCapturedArguments() return _param0[len(_param0)-1] } -func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { +func (c *MockResourceCleaner_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []jobs.PullInfo) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]string, len(c.methodInvocations)) + _param0 = make([]jobs.PullInfo, len(c.methodInvocations)) for u, param := range params[0] { - _param0[u] = param.(string) + _param0[u] = param.(jobs.PullInfo) } } return diff --git a/server/events/models/models.go b/server/events/models/models.go index 50a77e963b..287c7a9fff 100644 --- a/server/events/models/models.go +++ b/server/events/models/models.go @@ -33,7 +33,6 @@ import ( const ( planfileSlashReplace = "::" - LogStreamingClearMsg = "\n-----Starting New Process-----" ) type PullReqStatus struct { @@ -412,6 +411,8 @@ type ProjectCommandContext struct { PolicySets valid.PolicySets // DeleteSourceBranchOnMerge will attempt to allow a branch to be deleted when merged (AzureDevOps & GitLab Support Only) DeleteSourceBranchOnMerge bool + // UUID for atlantis logs + JobID string } // GetShowResultFileName returns the filename (not the path) to store the tf show result @@ -690,14 +691,6 @@ func (c CommandName) TitleString() string { return strings.Title(strings.ReplaceAll(strings.ToLower(c.String()), "_", " ")) } -type ProjectCmdOutputLine struct { - ProjectInfo string - - Line string - - ClearBuffBefore bool -} - // String returns the string representation of c. func (c CommandName) String() string { switch c { diff --git a/server/events/project_command_builder.go b/server/events/project_command_builder.go index f256d64c50..eb121213c1 100644 --- a/server/events/project_command_builder.go +++ b/server/events/project_command_builder.go @@ -54,7 +54,7 @@ func NewProjectCommandBuilder( SkipCloneNoChanges: skipCloneNoChanges, EnableRegExpCmd: EnableRegExpCmd, AutoplanFileList: AutoplanFileList, - ProjectCommandContextBuilder: NewProjectCommandContextBulder( + ProjectCommandContextBuilder: NewProjectCommandContextBuilder( policyChecksSupported, commentBuilder, ), diff --git a/server/events/project_command_builder_internal_test.go b/server/events/project_command_builder_internal_test.go index 9ddc5c23d3..9f484b11cc 100644 --- a/server/events/project_command_builder_internal_test.go +++ b/server/events/project_command_builder_internal_test.go @@ -658,6 +658,9 @@ projects: c.expCtx.Steps = expSteps ctx.PolicySets = emptyPolicySets + // Job ID cannot be compared since its generated at random + ctx.JobID = "" + Equals(t, c.expCtx, ctx) // Equals() doesn't compare TF version properly so have to // use .String(). @@ -849,6 +852,10 @@ projects: // Init fields we couldn't in our cases map. c.expCtx.Steps = expSteps ctx.PolicySets = emptyPolicySets + + // Job ID cannot be compared since its generated at random + ctx.JobID = "" + Equals(t, c.expCtx, ctx) // Equals() doesn't compare TF version properly so have to // use .String(). @@ -1064,6 +1071,9 @@ workflows: c.expCtx.Steps = expSteps ctx.PolicySets = emptyPolicySets + // Job ID cannot be compared since its generated at random + ctx.JobID = "" + Equals(t, c.expCtx, ctx) // Equals() doesn't compare TF version properly so have to // use .String(). diff --git a/server/events/project_command_context_builder.go b/server/events/project_command_context_builder.go index 2cc5c674ba..719bbfb62b 100644 --- a/server/events/project_command_context_builder.go +++ b/server/events/project_command_context_builder.go @@ -4,13 +4,14 @@ import ( "path/filepath" "regexp" + "github.com/google/uuid" "github.com/hashicorp/go-version" "github.com/hashicorp/terraform-config-inspect/tfconfig" "github.com/runatlantis/atlantis/server/core/config/valid" "github.com/runatlantis/atlantis/server/events/models" ) -func NewProjectCommandContextBulder(policyCheckEnabled bool, commentBuilder CommentBuilder) ProjectCommandContextBuilder { +func NewProjectCommandContextBuilder(policyCheckEnabled bool, commentBuilder CommentBuilder) ProjectCommandContextBuilder { projectCommandContextBuilder := &DefaultProjectCommandContextBuilder{ CommentBuilder: commentBuilder, } @@ -217,6 +218,7 @@ func newProjectCommandContext(ctx *CommandContext, Workspace: projCfg.Workspace, PolicySets: policySets, PullReqStatus: pullStatus, + JobID: uuid.New().String(), } } diff --git a/server/events/project_command_runner.go b/server/events/project_command_runner.go index 86a41d32e8..84e0f7929e 100644 --- a/server/events/project_command_runner.go +++ b/server/events/project_command_runner.go @@ -24,10 +24,11 @@ import ( "github.com/runatlantis/atlantis/server/core/runtime" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/webhooks" - "github.com/runatlantis/atlantis/server/handlers" "github.com/runatlantis/atlantis/server/logging" ) +const OperationComplete = true + // DirNotExistErr is an error caused by the directory not existing. type DirNotExistErr struct { RepoRelDir string @@ -115,29 +116,45 @@ type ProjectCommandRunner interface { ProjectVersionCommandRunner } +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_url_setter.go JobURLSetter + +type JobURLSetter interface { + // SetJobURLWithStatus sets the commit status for the project represented by + // ctx and updates the status with and url to a job. + SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error +} + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_message_sender.go JobMessageSender + +type JobMessageSender interface { + Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) +} + // ProjectOutputWrapper is a decorator that creates a new PR status check per project. // The status contains a url that outputs current progress of the terraform plan/apply command. type ProjectOutputWrapper struct { ProjectCommandRunner - ProjectCmdOutputHandler handlers.ProjectCommandOutputHandler + JobMessageSender JobMessageSender + JobURLSetter JobURLSetter } func (p *ProjectOutputWrapper) Plan(ctx models.ProjectCommandContext) models.ProjectResult { - // Reset the buffer when running the plan. We only need to do this for plan, - // apply is a continuation of the same workflow - p.ProjectCmdOutputHandler.Clear(ctx) - return p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan) + result := p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan) + p.JobMessageSender.Send(ctx, "", OperationComplete) + return result } func (p *ProjectOutputWrapper) Apply(ctx models.ProjectCommandContext) models.ProjectResult { - return p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply) + result := p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply) + p.JobMessageSender.Send(ctx, "", OperationComplete) + return result } func (p *ProjectOutputWrapper) updateProjectPRStatus(commandName models.CommandName, ctx models.ProjectCommandContext, execute func(ctx models.ProjectCommandContext) models.ProjectResult) models.ProjectResult { // Create a PR status to track project's plan status. The status will // include a link to view the progress of atlantis plan command in real // time - if err := p.ProjectCmdOutputHandler.SetJobURLWithStatus(ctx, commandName, models.PendingCommitStatus); err != nil { + if err := p.JobURLSetter.SetJobURLWithStatus(ctx, commandName, models.PendingCommitStatus); err != nil { ctx.Log.Err("updating project PR status", err) } @@ -145,14 +162,14 @@ func (p *ProjectOutputWrapper) updateProjectPRStatus(commandName models.CommandN result := execute(ctx) if result.Error != nil || result.Failure != "" { - if err := p.ProjectCmdOutputHandler.SetJobURLWithStatus(ctx, commandName, models.FailedCommitStatus); err != nil { + if err := p.JobURLSetter.SetJobURLWithStatus(ctx, commandName, models.FailedCommitStatus); err != nil { ctx.Log.Err("updating project PR status", err) } return result } - if err := p.ProjectCmdOutputHandler.SetJobURLWithStatus(ctx, commandName, models.SuccessCommitStatus); err != nil { + if err := p.JobURLSetter.SetJobURLWithStatus(ctx, commandName, models.SuccessCommitStatus); err != nil { ctx.Log.Err("updating project PR status", err) } diff --git a/server/events/project_command_runner_test.go b/server/events/project_command_runner_test.go index 052dff4288..5b964c2ea0 100644 --- a/server/events/project_command_runner_test.go +++ b/server/events/project_command_runner_test.go @@ -26,9 +26,9 @@ import ( tmocks "github.com/runatlantis/atlantis/server/core/terraform/mocks" "github.com/runatlantis/atlantis/server/events" "github.com/runatlantis/atlantis/server/events/mocks" + eventmocks "github.com/runatlantis/atlantis/server/events/mocks" "github.com/runatlantis/atlantis/server/events/mocks/matchers" "github.com/runatlantis/atlantis/server/events/models" - handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" "github.com/runatlantis/atlantis/server/logging" . "github.com/runatlantis/atlantis/testing" ) @@ -188,12 +188,14 @@ func TestProjectOutputWrapper(t *testing.T) { var prjResult models.ProjectResult var expCommitStatus models.CommitStatus - mockProjectCommandOutputHandler := handlermocks.NewMockProjectCommandOutputHandler() + mockJobURLSetter := eventmocks.NewMockJobURLSetter() + mockJobMessageSender := eventmocks.NewMockJobMessageSender() mockProjectCommandRunner := mocks.NewMockProjectCommandRunner() runner := &events.ProjectOutputWrapper{ - ProjectCmdOutputHandler: mockProjectCommandOutputHandler, - ProjectCommandRunner: mockProjectCommandRunner, + JobURLSetter: mockJobURLSetter, + JobMessageSender: mockJobMessageSender, + ProjectCommandRunner: mockProjectCommandRunner, } if c.Success { @@ -224,8 +226,8 @@ func TestProjectOutputWrapper(t *testing.T) { runner.Apply(ctx) } - mockProjectCommandOutputHandler.VerifyWasCalled(Once()).SetJobURLWithStatus(ctx, c.CommandName, models.PendingCommitStatus) - mockProjectCommandOutputHandler.VerifyWasCalled(Once()).SetJobURLWithStatus(ctx, c.CommandName, expCommitStatus) + mockJobURLSetter.VerifyWasCalled(Once()).SetJobURLWithStatus(ctx, c.CommandName, models.PendingCommitStatus) + mockJobURLSetter.VerifyWasCalled(Once()).SetJobURLWithStatus(ctx, c.CommandName, expCommitStatus) switch c.CommandName { case models.PlanCommand: diff --git a/server/events/pull_closed_executor.go b/server/events/pull_closed_executor.go index 4b78751c0d..27e2836de4 100644 --- a/server/events/pull_closed_executor.go +++ b/server/events/pull_closed_executor.go @@ -29,9 +29,15 @@ import ( "github.com/runatlantis/atlantis/server/core/locking" "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/vcs" - "github.com/runatlantis/atlantis/server/handlers" + "github.com/runatlantis/atlantis/server/jobs" ) +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_resource_cleaner.go ResourceCleaner + +type ResourceCleaner interface { + CleanUp(pullInfo jobs.PullInfo) +} + //go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_pull_cleaner.go PullCleaner // PullCleaner cleans up pull requests after they're closed/merged. @@ -50,7 +56,7 @@ type PullClosedExecutor struct { Logger logging.SimpleLogging DB *db.BoltDB PullClosedTemplate PullCleanupTemplate - LogStreamResourceCleaner handlers.ResourceCleaner + LogStreamResourceCleaner ResourceCleaner } type templatedProject struct { @@ -83,13 +89,13 @@ func (p *PullClosedExecutor) CleanUpPull(repo models.Repo, pull models.PullReque if pullStatus != nil { for _, project := range pullStatus.Projects { - normalizedOwner := strings.ReplaceAll(pullStatus.Pull.BaseRepo.Owner, "/", "-") - normalizedName := strings.ReplaceAll(pullStatus.Pull.BaseRepo.Name, "/", "-") - projectRepo := fmt.Sprintf("%s/%s", normalizedOwner, normalizedName) - - projectKey := models.BuildPullInfo(projectRepo, pull.Num, project.ProjectName, project.RepoRelDir, project.Workspace) - - p.LogStreamResourceCleaner.CleanUp(projectKey) + jobContext := jobs.PullInfo{ + PullNum: pull.Num, + Repo: pull.BaseRepo.Name, + Workspace: project.Workspace, + ProjectName: project.ProjectName, + } + p.LogStreamResourceCleaner.CleanUp(jobContext) } } diff --git a/server/events/pull_closed_executor_test.go b/server/events/pull_closed_executor_test.go index c1d1f8f9bb..0941c1f1b0 100644 --- a/server/events/pull_closed_executor_test.go +++ b/server/events/pull_closed_executor_test.go @@ -19,7 +19,7 @@ import ( "github.com/pkg/errors" "github.com/runatlantis/atlantis/server/core/db" - "github.com/runatlantis/atlantis/server/handlers" + "github.com/runatlantis/atlantis/server/jobs" "github.com/stretchr/testify/assert" bolt "go.etcd.io/bbolt" @@ -31,7 +31,6 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/models/fixtures" vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks" - handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks" loggermocks "github.com/runatlantis/atlantis/server/logging/mocks" . "github.com/runatlantis/atlantis/testing" ) @@ -194,12 +193,10 @@ func TestCleanUpLogStreaming(t *testing.T) { RegisterMockTestingT(t) t.Run("Should Clean Up Log Streaming Resources When PR is closed", func(t *testing.T) { - prjStatusUpdater := handlermocks.NewMockProjectStatusUpdater() - prjJobURLGenerator := handlermocks.NewMockProjectJobURLGenerator() // Create Log streaming resources - prjCmdOutput := make(chan *models.ProjectCmdOutputLine) - prjCmdOutHandler := handlers.NewAsyncProjectCommandOutputHandler(prjCmdOutput, prjStatusUpdater, prjJobURLGenerator, logger) + prjCmdOutput := make(chan *jobs.ProjectCmdOutputLine) + prjCmdOutHandler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutput, logger) ctx := models.ProjectCommandContext{ BaseRepo: fixtures.GithubRepo, Pull: fixtures.Pull, @@ -208,7 +205,7 @@ func TestCleanUpLogStreaming(t *testing.T) { } go prjCmdOutHandler.Handle() - prjCmdOutHandler.Send(ctx, "Test Message") + prjCmdOutHandler.Send(ctx, "Test Message", false) // Create boltdb and add pull request. var lockBucket = "bucket" @@ -281,7 +278,7 @@ func TestCleanUpLogStreaming(t *testing.T) { Equals(t, expectedComment, comment) // Assert log streaming resources are cleaned up. - dfPrjCmdOutputHandler := prjCmdOutHandler.(*handlers.AsyncProjectCommandOutputHandler) + dfPrjCmdOutputHandler := prjCmdOutHandler.(*jobs.AsyncProjectCommandOutputHandler) assert.Empty(t, dfPrjCmdOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) assert.Empty(t, dfPrjCmdOutputHandler.GetReceiverBufferForPull(ctx.PullInfo())) }) diff --git a/server/handlers/mocks/matchers/http_header.go b/server/handlers/mocks/matchers/http_header.go deleted file mode 100644 index 7531557917..0000000000 --- a/server/handlers/mocks/matchers/http_header.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" - - http "net/http" -) - -func AnyHttpHeader() http.Header { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(http.Header))(nil)).Elem())) - var nullValue http.Header - return nullValue -} - -func EqHttpHeader(value http.Header) http.Header { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue http.Header - return nullValue -} - -func NotEqHttpHeader(value http.Header) http.Header { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue http.Header - return nullValue -} - -func HttpHeaderThat(matcher pegomock.ArgumentMatcher) http.Header { - pegomock.RegisterMatcher(matcher) - var nullValue http.Header - return nullValue -} diff --git a/server/handlers/mocks/matchers/http_responsewriter.go b/server/handlers/mocks/matchers/http_responsewriter.go deleted file mode 100644 index 1927eca531..0000000000 --- a/server/handlers/mocks/matchers/http_responsewriter.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" - - http "net/http" -) - -func AnyHttpResponseWriter() http.ResponseWriter { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(http.ResponseWriter))(nil)).Elem())) - var nullValue http.ResponseWriter - return nullValue -} - -func EqHttpResponseWriter(value http.ResponseWriter) http.ResponseWriter { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue http.ResponseWriter - return nullValue -} - -func NotEqHttpResponseWriter(value http.ResponseWriter) http.ResponseWriter { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue http.ResponseWriter - return nullValue -} - -func HttpResponseWriterThat(matcher pegomock.ArgumentMatcher) http.ResponseWriter { - pegomock.RegisterMatcher(matcher) - var nullValue http.ResponseWriter - return nullValue -} diff --git a/server/handlers/mocks/matchers/map_of_chan_of_string_to_bool.go b/server/handlers/mocks/matchers/map_of_chan_of_string_to_bool.go deleted file mode 100644 index 5cd33d3bac..0000000000 --- a/server/handlers/mocks/matchers/map_of_chan_of_string_to_bool.go +++ /dev/null @@ -1,31 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" -) - -func AnyMapOfChanOfStringToBool() map[chan string]bool { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(map[chan string]bool))(nil)).Elem())) - var nullValue map[chan string]bool - return nullValue -} - -func EqMapOfChanOfStringToBool(value map[chan string]bool) map[chan string]bool { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue map[chan string]bool - return nullValue -} - -func NotEqMapOfChanOfStringToBool(value map[chan string]bool) map[chan string]bool { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue map[chan string]bool - return nullValue -} - -func MapOfChanOfStringToBoolThat(matcher pegomock.ArgumentMatcher) map[chan string]bool { - pegomock.RegisterMatcher(matcher) - var nullValue map[chan string]bool - return nullValue -} diff --git a/server/handlers/mocks/matchers/ptr_to_http_request.go b/server/handlers/mocks/matchers/ptr_to_http_request.go deleted file mode 100644 index dfbfc18674..0000000000 --- a/server/handlers/mocks/matchers/ptr_to_http_request.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" - - http "net/http" -) - -func AnyPtrToHttpRequest() *http.Request { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(*http.Request))(nil)).Elem())) - var nullValue *http.Request - return nullValue -} - -func EqPtrToHttpRequest(value *http.Request) *http.Request { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue *http.Request - return nullValue -} - -func NotEqPtrToHttpRequest(value *http.Request) *http.Request { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue *http.Request - return nullValue -} - -func PtrToHttpRequestThat(matcher pegomock.ArgumentMatcher) *http.Request { - pegomock.RegisterMatcher(matcher) - var nullValue *http.Request - return nullValue -} diff --git a/server/handlers/mocks/matchers/slice_of_byte.go b/server/handlers/mocks/matchers/slice_of_byte.go deleted file mode 100644 index 9515313456..0000000000 --- a/server/handlers/mocks/matchers/slice_of_byte.go +++ /dev/null @@ -1,31 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" -) - -func AnySliceOfByte() []byte { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*([]byte))(nil)).Elem())) - var nullValue []byte - return nullValue -} - -func EqSliceOfByte(value []byte) []byte { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue []byte - return nullValue -} - -func NotEqSliceOfByte(value []byte) []byte { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue []byte - return nullValue -} - -func SliceOfByteThat(matcher pegomock.ArgumentMatcher) []byte { - pegomock.RegisterMatcher(matcher) - var nullValue []byte - return nullValue -} diff --git a/server/handlers/mocks/matchers/slice_of_string.go b/server/handlers/mocks/matchers/slice_of_string.go deleted file mode 100644 index f9281819dd..0000000000 --- a/server/handlers/mocks/matchers/slice_of_string.go +++ /dev/null @@ -1,31 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "github.com/petergtz/pegomock" - "reflect" -) - -func AnySliceOfString() []string { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*([]string))(nil)).Elem())) - var nullValue []string - return nullValue -} - -func EqSliceOfString(value []string) []string { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue []string - return nullValue -} - -func NotEqSliceOfString(value []string) []string { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue []string - return nullValue -} - -func SliceOfStringThat(matcher pegomock.ArgumentMatcher) []string { - pegomock.RegisterMatcher(matcher) - var nullValue []string - return nullValue -} diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go deleted file mode 100644 index b530aa5ef3..0000000000 --- a/server/handlers/project_command_output_handler.go +++ /dev/null @@ -1,232 +0,0 @@ -package handlers - -import ( - "sync" - - "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/logging" -) - -// AsyncProjectCommandOutputHandler is a handler to transport terraform client -// outputs to the front end. -type AsyncProjectCommandOutputHandler struct { - projectCmdOutput chan *models.ProjectCmdOutputLine - - projectOutputBuffers map[string][]string - projectOutputBuffersLock sync.RWMutex - - receiverBuffers map[string]map[chan string]bool - receiverBuffersLock sync.RWMutex - - projectStatusUpdater ProjectStatusUpdater - projectJobURLGenerator ProjectJobURLGenerator - - logger logging.SimpleLogging -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_job_url_generator.go ProjectJobURLGenerator - -// ProjectJobURLGenerator generates urls to view project's progress. -type ProjectJobURLGenerator interface { - GenerateProjectJobURL(p models.ProjectCommandContext) (string, error) -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_status_updater.go ProjectStatusUpdater - -type ProjectStatusUpdater interface { - // UpdateProject sets the commit status for the project represented by - // ctx. - UpdateProject(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus, url string) error -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_command_output_handler.go ProjectCommandOutputHandler - -type ProjectCommandOutputHandler interface { - // Clear clears the buffer from previous terraform output lines - Clear(ctx models.ProjectCommandContext) - - // Send will enqueue the msg and wait for Handle() to receive the message. - Send(ctx models.ProjectCommandContext, msg string) - - // Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting - // to read the channel in the same goroutine - Register(projectInfo string, receiver chan string) - - // Deregister removes a channel from successive updates and closes it. - Deregister(projectInfo string, receiver chan string) - - // Listens for msg from channel - Handle() - - // SetJobURLWithStatus sets the commit status for the project represented by - // ctx and updates the status with and url to a job. - SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error - - ResourceCleaner -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_resource_cleaner.go ResourceCleaner - -type ResourceCleaner interface { - CleanUp(pull string) -} - -func NewAsyncProjectCommandOutputHandler( - projectCmdOutput chan *models.ProjectCmdOutputLine, - projectStatusUpdater ProjectStatusUpdater, - projectJobURLGenerator ProjectJobURLGenerator, - logger logging.SimpleLogging, -) ProjectCommandOutputHandler { - return &AsyncProjectCommandOutputHandler{ - projectCmdOutput: projectCmdOutput, - logger: logger, - receiverBuffers: map[string]map[chan string]bool{}, - projectStatusUpdater: projectStatusUpdater, - projectJobURLGenerator: projectJobURLGenerator, - projectOutputBuffers: map[string][]string{}, - } -} - -func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { - p.projectCmdOutput <- &models.ProjectCmdOutputLine{ - ProjectInfo: ctx.PullInfo(), - Line: msg, - } -} - -func (p *AsyncProjectCommandOutputHandler) Register(projectInfo string, receiver chan string) { - p.addChan(receiver, projectInfo) -} - -func (p *AsyncProjectCommandOutputHandler) Handle() { - for msg := range p.projectCmdOutput { - if msg.ClearBuffBefore { - p.clearLogLines(msg.ProjectInfo) - } - p.writeLogLine(msg.ProjectInfo, msg.Line) - } -} - -func (p *AsyncProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) { - p.projectCmdOutput <- &models.ProjectCmdOutputLine{ - ProjectInfo: ctx.PullInfo(), - Line: models.LogStreamingClearMsg, - ClearBuffBefore: true, - } -} - -func (p *AsyncProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { - url, err := p.projectJobURLGenerator.GenerateProjectJobURL(ctx) - - if err != nil { - return err - } - return p.projectStatusUpdater.UpdateProject(ctx, cmdName, status, url) -} - -func (p *AsyncProjectCommandOutputHandler) clearLogLines(pull string) { - p.projectOutputBuffersLock.Lock() - delete(p.projectOutputBuffers, pull) - p.projectOutputBuffersLock.Unlock() -} - -func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, pull string) { - p.projectOutputBuffersLock.RLock() - buffer := p.projectOutputBuffers[pull] - p.projectOutputBuffersLock.RUnlock() - - for _, line := range buffer { - ch <- line - } - - // add the channel to our registry after we backfill the contents of the buffer, - // to prevent new messages coming in interleaving with this backfill. - p.receiverBuffersLock.Lock() - if p.receiverBuffers[pull] == nil { - p.receiverBuffers[pull] = map[chan string]bool{} - } - p.receiverBuffers[pull][ch] = true - p.receiverBuffersLock.Unlock() -} - -//Add log line to buffer and send to all current channels -func (p *AsyncProjectCommandOutputHandler) writeLogLine(pull string, line string) { - p.receiverBuffersLock.Lock() - for ch := range p.receiverBuffers[pull] { - select { - case ch <- line: - default: - // Client ws conn could be closed in two ways: - // 1. Client closes the conn gracefully -> the closeHandler() is executed which - // closes the channel and cleans up resources. - // 2. Client does not close the conn and the closeHandler() is not executed -> the - // receiverChan will be blocking for N number of messages (equal to buffer size) - // before we delete the channel and clean up the resources. - delete(p.receiverBuffers[pull], ch) - } - } - p.receiverBuffersLock.Unlock() - - // No need to write to projectOutputBuffers if clear msg. - if line == models.LogStreamingClearMsg { - return - } - - p.projectOutputBuffersLock.Lock() - if p.projectOutputBuffers[pull] == nil { - p.projectOutputBuffers[pull] = []string{} - } - p.projectOutputBuffers[pull] = append(p.projectOutputBuffers[pull], line) - p.projectOutputBuffersLock.Unlock() -} - -//Remove channel, so client no longer receives Terraform output -func (p *AsyncProjectCommandOutputHandler) Deregister(pull string, ch chan string) { - p.logger.Debug("Removing channel for %s", pull) - p.receiverBuffersLock.Lock() - delete(p.receiverBuffers[pull], ch) - p.receiverBuffersLock.Unlock() -} - -func (p *AsyncProjectCommandOutputHandler) GetReceiverBufferForPull(pull string) map[chan string]bool { - return p.receiverBuffers[pull] -} - -func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(pull string) []string { - return p.projectOutputBuffers[pull] -} - -func (p *AsyncProjectCommandOutputHandler) CleanUp(pull string) { - p.projectOutputBuffersLock.Lock() - delete(p.projectOutputBuffers, pull) - p.projectOutputBuffersLock.Unlock() - - // Only delete the pull record from receiver buffers. - // WS channel will be closed when the user closes the browser tab - // in closeHanlder(). - p.receiverBuffersLock.Lock() - delete(p.receiverBuffers, pull) - p.receiverBuffersLock.Unlock() -} - -// NoopProjectOutputHandler is a mock that doesn't do anything -type NoopProjectOutputHandler struct{} - -func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { -} - -func (p *NoopProjectOutputHandler) Register(projectInfo string, receiver chan string) {} -func (p *NoopProjectOutputHandler) Deregister(projectInfo string, receiver chan string) {} - -func (p *NoopProjectOutputHandler) Handle() { -} - -func (p *NoopProjectOutputHandler) Clear(ctx models.ProjectCommandContext) { -} - -func (p *NoopProjectOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { - return nil -} - -func (p *NoopProjectOutputHandler) CleanUp(pull string) { -} diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go deleted file mode 100644 index f644881815..0000000000 --- a/server/handlers/project_command_output_handler_test.go +++ /dev/null @@ -1,216 +0,0 @@ -package handlers_test - -import ( - "errors" - "sync" - "testing" - - "github.com/runatlantis/atlantis/server/events/models" - "github.com/runatlantis/atlantis/server/handlers" - "github.com/runatlantis/atlantis/server/handlers/mocks" - "github.com/runatlantis/atlantis/server/handlers/mocks/matchers" - "github.com/runatlantis/atlantis/server/logging" - "github.com/stretchr/testify/assert" - - . "github.com/petergtz/pegomock" - . "github.com/runatlantis/atlantis/testing" -) - -func createTestProjectCmdContext(t *testing.T) models.ProjectCommandContext { - logger := logging.NewNoopLogger(t) - return models.ProjectCommandContext{ - BaseRepo: models.Repo{ - Name: "test-repo", - Owner: "test-org", - }, - HeadRepo: models.Repo{ - Name: "test-repo", - Owner: "test-org", - }, - Pull: models.PullRequest{ - Num: 1, - HeadBranch: "master", - BaseBranch: "master", - Author: "test-user", - }, - User: models.User{ - Username: "test-user", - }, - Log: logger, - Workspace: "myworkspace", - RepoRelDir: "test-dir", - ProjectName: "test-project", - } -} - -func createProjectCommandOutputHandler(t *testing.T) handlers.ProjectCommandOutputHandler { - logger := logging.NewNoopLogger(t) - prjCmdOutputChan := make(chan *models.ProjectCmdOutputLine) - projectStatusUpdater := mocks.NewMockProjectStatusUpdater() - projectJobURLGenerator := mocks.NewMockProjectJobURLGenerator() - prjCmdOutputHandler := handlers.NewAsyncProjectCommandOutputHandler( - prjCmdOutputChan, - projectStatusUpdater, - projectJobURLGenerator, - logger, - ) - - go func() { - prjCmdOutputHandler.Handle() - }() - - return prjCmdOutputHandler -} - -func TestProjectCommandOutputHandler(t *testing.T) { - Msg := "Test Terraform Output" - ctx := createTestProjectCmdContext(t) - - t.Run("receive message from main channel", func(t *testing.T) { - var wg sync.WaitGroup - var expectedMsg string - projectOutputHandler := createProjectCommandOutputHandler(t) - - ch := make(chan string) - - // register channel and backfill from buffer - // Note: We call this synchronously because otherwise - // there could be a race where we are unable to register the channel - // before sending messages due to the way we lock our buffer memory cache - projectOutputHandler.Register(ctx.PullInfo(), ch) - - wg.Add(1) - - // read from channel - go func() { - for msg := range ch { - expectedMsg = msg - wg.Done() - } - }() - - projectOutputHandler.Send(ctx, Msg) - wg.Wait() - close(ch) - - // Wait for the msg to be read. - wg.Wait() - Equals(t, expectedMsg, Msg) - }) - - t.Run("clear buffer", func(t *testing.T) { - var wg sync.WaitGroup - - projectOutputHandler := createProjectCommandOutputHandler(t) - - ch := make(chan string) - - // register channel and backfill from buffer - // Note: We call this synchronously because otherwise - // there could be a race where we are unable to register the channel - // before sending messages due to the way we lock our buffer memory cache - projectOutputHandler.Register(ctx.PullInfo(), ch) - - wg.Add(1) - // read from channel asynchronously - go func() { - for msg := range ch { - // we are done once we receive the clear message. - // prior message doesn't matter for this test. - if msg == models.LogStreamingClearMsg { - wg.Done() - } - } - }() - - // send regular message followed by clear message - projectOutputHandler.Send(ctx, Msg) - projectOutputHandler.Clear(ctx) - wg.Wait() - close(ch) - - dfProjectOutputHandler, ok := projectOutputHandler.(*handlers.AsyncProjectCommandOutputHandler) - assert.True(t, ok) - - assert.Empty(t, dfProjectOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) - }) - - t.Run("copies buffer to new channels", func(t *testing.T) { - var wg sync.WaitGroup - - projectOutputHandler := createProjectCommandOutputHandler(t) - - // send first message to populated the buffer - projectOutputHandler.Send(ctx, Msg) - - ch := make(chan string) - - receivedMsgs := []string{} - - wg.Add(1) - // read from channel asynchronously - go func() { - for msg := range ch { - receivedMsgs = append(receivedMsgs, msg) - - // we're only expecting two messages here. - if len(receivedMsgs) >= 2 { - wg.Done() - } - } - }() - // register channel and backfill from buffer - // Note: We call this synchronously because otherwise - // there could be a race where we are unable to register the channel - // before sending messages due to the way we lock our buffer memory cache - projectOutputHandler.Register(ctx.PullInfo(), ch) - - projectOutputHandler.Send(ctx, Msg) - wg.Wait() - close(ch) - - expectedMsgs := []string{Msg, Msg} - assert.Equal(t, len(expectedMsgs), len(receivedMsgs)) - for i := range expectedMsgs { - assert.Equal(t, expectedMsgs[i], receivedMsgs[i]) - } - }) - - t.Run("update project status with project jobs url", func(t *testing.T) { - RegisterMockTestingT(t) - logger := logging.NewNoopLogger(t) - prjCmdOutputChan := make(chan *models.ProjectCmdOutputLine) - projectStatusUpdater := mocks.NewMockProjectStatusUpdater() - projectJobURLGenerator := mocks.NewMockProjectJobURLGenerator() - prjCmdOutputHandler := handlers.NewAsyncProjectCommandOutputHandler( - prjCmdOutputChan, - projectStatusUpdater, - projectJobURLGenerator, - logger, - ) - - When(projectJobURLGenerator.GenerateProjectJobURL(matchers.EqModelsProjectCommandContext(ctx))).ThenReturn("url-to-project-jobs", nil) - err := prjCmdOutputHandler.SetJobURLWithStatus(ctx, models.PlanCommand, models.PendingCommitStatus) - Ok(t, err) - - projectStatusUpdater.VerifyWasCalledOnce().UpdateProject(ctx, models.PlanCommand, models.PendingCommitStatus, "url-to-project-jobs") - }) - - t.Run("update project status with project jobs url error", func(t *testing.T) { - RegisterMockTestingT(t) - logger := logging.NewNoopLogger(t) - prjCmdOutputChan := make(chan *models.ProjectCmdOutputLine) - projectStatusUpdater := mocks.NewMockProjectStatusUpdater() - projectJobURLGenerator := mocks.NewMockProjectJobURLGenerator() - prjCmdOutputHandler := handlers.NewAsyncProjectCommandOutputHandler( - prjCmdOutputChan, - projectStatusUpdater, - projectJobURLGenerator, - logger, - ) - - When(projectJobURLGenerator.GenerateProjectJobURL(matchers.EqModelsProjectCommandContext(ctx))).ThenReturn("url-to-project-jobs", errors.New("some error")) - err := prjCmdOutputHandler.SetJobURLWithStatus(ctx, models.PlanCommand, models.PendingCommitStatus) - assert.Error(t, err) - }) -} diff --git a/server/handlers/websocket_handler.go b/server/handlers/websocket_handler.go deleted file mode 100644 index 3b98a5a54c..0000000000 --- a/server/handlers/websocket_handler.go +++ /dev/null @@ -1,61 +0,0 @@ -package handlers - -import ( - "net/http" - - "github.com/gorilla/websocket" - "github.com/runatlantis/atlantis/server/logging" -) - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_handler.go WebsocketHandler - -type WebsocketHandler interface { - Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketConnectionWrapper, error) - SetReadHandler(w WebsocketConnectionWrapper) - SetCloseHandler(w WebsocketConnectionWrapper, receiver chan string) -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_websocket_connection_wrapper.go WebsocketConnectionWrapper - -type WebsocketConnectionWrapper interface { - ReadMessage() (messageType int, p []byte, err error) - WriteMessage(messageType int, data []byte) error - SetCloseHandler(h func(code int, text string) error) -} - -type DefaultWebsocketHandler struct { - handler websocket.Upgrader - Logger logging.SimpleLogging -} - -func NewWebsocketHandler(logger logging.SimpleLogging) WebsocketHandler { - h := websocket.Upgrader{} - h.CheckOrigin = func(r *http.Request) bool { return true } - return &DefaultWebsocketHandler{ - handler: h, - Logger: logger, - } -} - -func (wh *DefaultWebsocketHandler) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (WebsocketConnectionWrapper, error) { - return wh.handler.Upgrade(w, r, responseHeader) -} - -func (wh *DefaultWebsocketHandler) SetReadHandler(w WebsocketConnectionWrapper) { - for { - _, _, err := w.ReadMessage() - if err != nil { - wh.Logger.Warn("Failed to read WS message: %s", err) - return - } - } -} - -func (wh *DefaultWebsocketHandler) SetCloseHandler(w WebsocketConnectionWrapper, receiver chan string) { - w.SetCloseHandler(func(code int, text string) error { - // Close the channnel after websocket connection closed. - // Will gracefully exit the ProjectCommandOutputHandler.Receive() call and cleanup. - close(receiver) - return nil - }) -} diff --git a/server/jobs/job_url_setter.go b/server/jobs/job_url_setter.go new file mode 100644 index 0000000000..d681935c53 --- /dev/null +++ b/server/jobs/job_url_setter.go @@ -0,0 +1,39 @@ +package jobs + +import "github.com/runatlantis/atlantis/server/events/models" + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_job_url_generator.go ProjectJobURLGenerator + +// ProjectJobURLGenerator generates urls to view project's progress. +type ProjectJobURLGenerator interface { + GenerateProjectJobURL(p models.ProjectCommandContext) (string, error) +} + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_status_updater.go ProjectStatusUpdater + +type ProjectStatusUpdater interface { + // UpdateProject sets the commit status for the project represented by + // ctx. + UpdateProject(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus, url string) error +} + +type JobURLSetter struct { + projectJobURLGenerator ProjectJobURLGenerator + projectStatusUpdater ProjectStatusUpdater +} + +func NewJobURLSetter(projectJobURLGenerator ProjectJobURLGenerator, projectStatusUpdater ProjectStatusUpdater) *JobURLSetter { + return &JobURLSetter{ + projectJobURLGenerator: projectJobURLGenerator, + projectStatusUpdater: projectStatusUpdater, + } +} + +func (j *JobURLSetter) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { + url, err := j.projectJobURLGenerator.GenerateProjectJobURL(ctx) + + if err != nil { + return err + } + return j.projectStatusUpdater.UpdateProject(ctx, cmdName, status, url) +} diff --git a/server/jobs/job_url_setter_test.go b/server/jobs/job_url_setter_test.go new file mode 100644 index 0000000000..5ae8073c65 --- /dev/null +++ b/server/jobs/job_url_setter_test.go @@ -0,0 +1,44 @@ +package jobs_test + +import ( + "errors" + "testing" + + . "github.com/petergtz/pegomock" + "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/jobs" + "github.com/runatlantis/atlantis/server/jobs/mocks" + "github.com/runatlantis/atlantis/server/jobs/mocks/matchers" + . "github.com/runatlantis/atlantis/testing" + "github.com/stretchr/testify/assert" +) + +func TestJobURLSetter(t *testing.T) { + ctx := createTestProjectCmdContext(t) + + t.Run("update project status with project jobs url", func(t *testing.T) { + RegisterMockTestingT(t) + projectStatusUpdater := mocks.NewMockProjectStatusUpdater() + projectJobURLGenerator := mocks.NewMockProjectJobURLGenerator() + url := "url-to-project-jobs" + jobURLSetter := jobs.NewJobURLSetter(projectJobURLGenerator, projectStatusUpdater) + + When(projectJobURLGenerator.GenerateProjectJobURL(matchers.EqModelsProjectCommandContext(ctx))).ThenReturn(url, nil) + When(projectStatusUpdater.UpdateProject(ctx, models.PlanCommand, models.PendingCommitStatus, url)).ThenReturn(nil) + err := jobURLSetter.SetJobURLWithStatus(ctx, models.PlanCommand, models.PendingCommitStatus) + Ok(t, err) + + projectStatusUpdater.VerifyWasCalledOnce().UpdateProject(ctx, models.PlanCommand, models.PendingCommitStatus, "url-to-project-jobs") + }) + + t.Run("update project status with project jobs url error", func(t *testing.T) { + RegisterMockTestingT(t) + projectStatusUpdater := mocks.NewMockProjectStatusUpdater() + projectJobURLGenerator := mocks.NewMockProjectJobURLGenerator() + jobURLSetter := jobs.NewJobURLSetter(projectJobURLGenerator, projectStatusUpdater) + + When(projectJobURLGenerator.GenerateProjectJobURL(matchers.EqModelsProjectCommandContext(ctx))).ThenReturn("url-to-project-jobs", errors.New("some error")) + err := jobURLSetter.SetJobURLWithStatus(ctx, models.PlanCommand, models.PendingCommitStatus) + assert.Error(t, err) + }) +} diff --git a/server/handlers/mocks/matchers/chan_of_string.go b/server/jobs/mocks/matchers/chan_of_string.go similarity index 100% rename from server/handlers/mocks/matchers/chan_of_string.go rename to server/jobs/mocks/matchers/chan_of_string.go diff --git a/server/jobs/mocks/matchers/jobs_pullinfo.go b/server/jobs/mocks/matchers/jobs_pullinfo.go new file mode 100644 index 0000000000..95e16a16fa --- /dev/null +++ b/server/jobs/mocks/matchers/jobs_pullinfo.go @@ -0,0 +1,33 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" + + jobs "github.com/runatlantis/atlantis/server/jobs" +) + +func AnyJobsPullInfo() jobs.PullInfo { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(jobs.PullInfo))(nil)).Elem())) + var nullValue jobs.PullInfo + return nullValue +} + +func EqJobsPullInfo(value jobs.PullInfo) jobs.PullInfo { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue jobs.PullInfo + return nullValue +} + +func NotEqJobsPullInfo(value jobs.PullInfo) jobs.PullInfo { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue jobs.PullInfo + return nullValue +} + +func JobsPullInfoThat(matcher pegomock.ArgumentMatcher) jobs.PullInfo { + pegomock.RegisterMatcher(matcher) + var nullValue jobs.PullInfo + return nullValue +} diff --git a/server/handlers/mocks/matchers/models_commandname.go b/server/jobs/mocks/matchers/models_commandname.go similarity index 100% rename from server/handlers/mocks/matchers/models_commandname.go rename to server/jobs/mocks/matchers/models_commandname.go diff --git a/server/handlers/mocks/matchers/models_commitstatus.go b/server/jobs/mocks/matchers/models_commitstatus.go similarity index 100% rename from server/handlers/mocks/matchers/models_commitstatus.go rename to server/jobs/mocks/matchers/models_commitstatus.go diff --git a/server/handlers/mocks/matchers/models_projectcommandcontext.go b/server/jobs/mocks/matchers/models_projectcommandcontext.go similarity index 100% rename from server/handlers/mocks/matchers/models_projectcommandcontext.go rename to server/jobs/mocks/matchers/models_projectcommandcontext.go diff --git a/server/handlers/mocks/mock_project_command_output_handler.go b/server/jobs/mocks/mock_project_command_output_handler.go similarity index 73% rename from server/handlers/mocks/mock_project_command_output_handler.go rename to server/jobs/mocks/mock_project_command_output_handler.go index f119e70290..32b6ed76a8 100644 --- a/server/handlers/mocks/mock_project_command_output_handler.go +++ b/server/jobs/mocks/mock_project_command_output_handler.go @@ -1,11 +1,12 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: ProjectCommandOutputHandler) +// Source: github.com/runatlantis/atlantis/server/jobs (interfaces: ProjectCommandOutputHandler) package mocks import ( pegomock "github.com/petergtz/pegomock" models "github.com/runatlantis/atlantis/server/events/models" + jobs "github.com/runatlantis/atlantis/server/jobs" "reflect" "time" ) @@ -25,7 +26,7 @@ func NewMockProjectCommandOutputHandler(options ...pegomock.Option) *MockProject func (mock *MockProjectCommandOutputHandler) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockProjectCommandOutputHandler) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockProjectCommandOutputHandler) CleanUp(_param0 string) { +func (mock *MockProjectCommandOutputHandler) CleanUp(_param0 jobs.PullInfo) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } @@ -33,14 +34,6 @@ func (mock *MockProjectCommandOutputHandler) CleanUp(_param0 string) { pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") - } - params := []pegomock.Param{_param0} - pegomock.GetGenericMockFrom(mock).Invoke("Clear", params, []reflect.Type{}) -} - func (mock *MockProjectCommandOutputHandler) Deregister(_param0 string, _param1 chan string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") @@ -57,35 +50,35 @@ func (mock *MockProjectCommandOutputHandler) Handle() { pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Register(_param0 string, _param1 chan string) { +func (mock *MockProjectCommandOutputHandler) IsKeyExists(_param0 string) bool { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1} - pegomock.GetGenericMockFrom(mock).Invoke("Register", params, []reflect.Type{}) + params := []pegomock.Param{_param0} + result := pegomock.GetGenericMockFrom(mock).Invoke("IsKeyExists", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem()}) + var ret0 bool + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(bool) + } + } + return ret0 } -func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) { +func (mock *MockProjectCommandOutputHandler) Register(_param0 string, _param1 chan string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } params := []pegomock.Param{_param0, _param1} - pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) + pegomock.GetGenericMockFrom(mock).Invoke("Register", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) error { +func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } params := []pegomock.Param{_param0, _param1, _param2} - result := pegomock.GetGenericMockFrom(mock).Invoke("SetJobURLWithStatus", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 + pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) } func (mock *MockProjectCommandOutputHandler) VerifyWasCalledOnce() *VerifierMockProjectCommandOutputHandler { @@ -125,7 +118,7 @@ type VerifierMockProjectCommandOutputHandler struct { timeout time.Duration } -func (verifier *VerifierMockProjectCommandOutputHandler) CleanUp(_param0 string) *MockProjectCommandOutputHandler_CleanUp_OngoingVerification { +func (verifier *VerifierMockProjectCommandOutputHandler) CleanUp(_param0 jobs.PullInfo) *MockProjectCommandOutputHandler_CleanUp_OngoingVerification { params := []pegomock.Param{_param0} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CleanUp", params, verifier.timeout) return &MockProjectCommandOutputHandler_CleanUp_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} @@ -136,44 +129,17 @@ type MockProjectCommandOutputHandler_CleanUp_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetCapturedArguments() string { +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetCapturedArguments() jobs.PullInfo { _param0 := c.GetAllCapturedArguments() return _param0[len(_param0)-1] } -func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { +func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapturedArguments() (_param0 []jobs.PullInfo) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]string, len(c.methodInvocations)) + _param0 = make([]jobs.PullInfo, len(c.methodInvocations)) for u, param := range params[0] { - _param0[u] = param.(string) - } - } - return -} - -func (verifier *VerifierMockProjectCommandOutputHandler) Clear(_param0 models.ProjectCommandContext) *MockProjectCommandOutputHandler_Clear_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Clear", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Clear_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockProjectCommandOutputHandler_Clear_OngoingVerification struct { - mock *MockProjectCommandOutputHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] -} - -func (c *MockProjectCommandOutputHandler_Clear_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(models.ProjectCommandContext) + _param0[u] = param.(jobs.PullInfo) } } return @@ -227,98 +193,94 @@ func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCaptured func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { } -func (verifier *VerifierMockProjectCommandOutputHandler) Register(_param0 string, _param1 chan string) *MockProjectCommandOutputHandler_Register_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Register", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Register_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) IsKeyExists(_param0 string) *MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "IsKeyExists", params, verifier.timeout) + return &MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Register_OngoingVerification struct { +type MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetCapturedArguments() (string, chan string) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] +func (c *MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] } -func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string) { +func (c *MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]string, len(c.methodInvocations)) for u, param := range params[0] { _param0[u] = param.(string) } - _param1 = make([]chan string, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(chan string) - } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) *MockProjectCommandOutputHandler_Send_OngoingVerification { +func (verifier *VerifierMockProjectCommandOutputHandler) Register(_param0 string, _param1 chan string) *MockProjectCommandOutputHandler_Register_OngoingVerification { params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Register", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Register_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Send_OngoingVerification struct { +type MockProjectCommandOutputHandler_Register_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { +func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetCapturedArguments() (string, chan string) { _param0, _param1 := c.GetAllCapturedArguments() return _param0[len(_param0)-1], _param1[len(_param1)-1] } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { +func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) + _param0 = make([]string, len(c.methodInvocations)) for u, param := range params[0] { - _param0[u] = param.(models.ProjectCommandContext) + _param0[u] = param.(string) } - _param1 = make([]string, len(c.methodInvocations)) + _param1 = make([]chan string, len(c.methodInvocations)) for u, param := range params[1] { - _param1[u] = param.(string) + _param1[u] = param.(chan string) } } return } -func (verifier *VerifierMockProjectCommandOutputHandler) SetJobURLWithStatus(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus) *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification { +func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) *MockProjectCommandOutputHandler_Send_OngoingVerification { params := []pegomock.Param{_param0, _param1, _param2} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetJobURLWithStatus", params, verifier.timeout) - return &MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) + return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification struct { +type MockProjectCommandOutputHandler_Send_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, models.CommandName, models.CommitStatus) { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string, bool) { _param0, _param1, _param2 := c.GetAllCapturedArguments() return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] } -func (c *MockProjectCommandOutputHandler_SetJobURLWithStatus_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []models.CommandName, _param2 []models.CommitStatus) { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string, _param2 []bool) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) for u, param := range params[0] { _param0[u] = param.(models.ProjectCommandContext) } - _param1 = make([]models.CommandName, len(c.methodInvocations)) + _param1 = make([]string, len(c.methodInvocations)) for u, param := range params[1] { - _param1[u] = param.(models.CommandName) + _param1[u] = param.(string) } - _param2 = make([]models.CommitStatus, len(c.methodInvocations)) + _param2 = make([]bool, len(c.methodInvocations)) for u, param := range params[2] { - _param2[u] = param.(models.CommitStatus) + _param2[u] = param.(bool) } } return diff --git a/server/handlers/mocks/mock_project_job_url_generator.go b/server/jobs/mocks/mock_project_job_url_generator.go similarity index 88% rename from server/handlers/mocks/mock_project_job_url_generator.go rename to server/jobs/mocks/mock_project_job_url_generator.go index 7386a383cc..385cc64102 100644 --- a/server/handlers/mocks/mock_project_job_url_generator.go +++ b/server/jobs/mocks/mock_project_job_url_generator.go @@ -1,5 +1,5 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: ProjectJobURLGenerator) +// Source: github.com/runatlantis/atlantis/server/jobs (interfaces: ProjectJobURLGenerator) package mocks @@ -25,11 +25,11 @@ func NewMockProjectJobURLGenerator(options ...pegomock.Option) *MockProjectJobUR func (mock *MockProjectJobURLGenerator) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockProjectJobURLGenerator) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockProjectJobURLGenerator) GenerateProjectJobURL(p models.ProjectCommandContext) (string, error) { +func (mock *MockProjectJobURLGenerator) GenerateProjectJobURL(_param0 models.ProjectCommandContext) (string, error) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectJobURLGenerator().") } - params := []pegomock.Param{p} + params := []pegomock.Param{_param0} result := pegomock.GetGenericMockFrom(mock).Invoke("GenerateProjectJobURL", params, []reflect.Type{reflect.TypeOf((*string)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) var ret0 string var ret1 error @@ -81,8 +81,8 @@ type VerifierMockProjectJobURLGenerator struct { timeout time.Duration } -func (verifier *VerifierMockProjectJobURLGenerator) GenerateProjectJobURL(p models.ProjectCommandContext) *MockProjectJobURLGenerator_GenerateProjectJobURL_OngoingVerification { - params := []pegomock.Param{p} +func (verifier *VerifierMockProjectJobURLGenerator) GenerateProjectJobURL(_param0 models.ProjectCommandContext) *MockProjectJobURLGenerator_GenerateProjectJobURL_OngoingVerification { + params := []pegomock.Param{_param0} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "GenerateProjectJobURL", params, verifier.timeout) return &MockProjectJobURLGenerator_GenerateProjectJobURL_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -93,8 +93,8 @@ type MockProjectJobURLGenerator_GenerateProjectJobURL_OngoingVerification struct } func (c *MockProjectJobURLGenerator_GenerateProjectJobURL_OngoingVerification) GetCapturedArguments() models.ProjectCommandContext { - p := c.GetAllCapturedArguments() - return p[len(p)-1] + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] } func (c *MockProjectJobURLGenerator_GenerateProjectJobURL_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext) { diff --git a/server/handlers/mocks/mock_project_status_updater.go b/server/jobs/mocks/mock_project_status_updater.go similarity index 84% rename from server/handlers/mocks/mock_project_status_updater.go rename to server/jobs/mocks/mock_project_status_updater.go index dd5a42d40a..4ea237d0f8 100644 --- a/server/handlers/mocks/mock_project_status_updater.go +++ b/server/jobs/mocks/mock_project_status_updater.go @@ -1,5 +1,5 @@ // Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/handlers (interfaces: ProjectStatusUpdater) +// Source: github.com/runatlantis/atlantis/server/jobs (interfaces: ProjectStatusUpdater) package mocks @@ -25,11 +25,11 @@ func NewMockProjectStatusUpdater(options ...pegomock.Option) *MockProjectStatusU func (mock *MockProjectStatusUpdater) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } func (mock *MockProjectStatusUpdater) FailHandler() pegomock.FailHandler { return mock.fail } -func (mock *MockProjectStatusUpdater) UpdateProject(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus, url string) error { +func (mock *MockProjectStatusUpdater) UpdateProject(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus, _param3 string) error { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectStatusUpdater().") } - params := []pegomock.Param{ctx, cmdName, status, url} + params := []pegomock.Param{_param0, _param1, _param2, _param3} result := pegomock.GetGenericMockFrom(mock).Invoke("UpdateProject", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) var ret0 error if len(result) != 0 { @@ -77,8 +77,8 @@ type VerifierMockProjectStatusUpdater struct { timeout time.Duration } -func (verifier *VerifierMockProjectStatusUpdater) UpdateProject(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus, url string) *MockProjectStatusUpdater_UpdateProject_OngoingVerification { - params := []pegomock.Param{ctx, cmdName, status, url} +func (verifier *VerifierMockProjectStatusUpdater) UpdateProject(_param0 models.ProjectCommandContext, _param1 models.CommandName, _param2 models.CommitStatus, _param3 string) *MockProjectStatusUpdater_UpdateProject_OngoingVerification { + params := []pegomock.Param{_param0, _param1, _param2, _param3} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "UpdateProject", params, verifier.timeout) return &MockProjectStatusUpdater_UpdateProject_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -89,8 +89,8 @@ type MockProjectStatusUpdater_UpdateProject_OngoingVerification struct { } func (c *MockProjectStatusUpdater_UpdateProject_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, models.CommandName, models.CommitStatus, string) { - ctx, cmdName, status, url := c.GetAllCapturedArguments() - return ctx[len(ctx)-1], cmdName[len(cmdName)-1], status[len(status)-1], url[len(url)-1] + _param0, _param1, _param2, _param3 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1], _param3[len(_param3)-1] } func (c *MockProjectStatusUpdater_UpdateProject_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []models.CommandName, _param2 []models.CommitStatus, _param3 []string) { diff --git a/server/jobs/project_command_output_handler.go b/server/jobs/project_command_output_handler.go new file mode 100644 index 0000000000..438bd799c1 --- /dev/null +++ b/server/jobs/project_command_output_handler.go @@ -0,0 +1,266 @@ +package jobs + +import ( + "sync" + + "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/logging" +) + +type OutputBuffer struct { + OperationComplete bool + Buffer []string +} + +type PullInfo struct { + PullNum int + Repo string + ProjectName string + Workspace string +} + +type JobInfo struct { + PullInfo + HeadCommit string +} + +type ProjectCmdOutputLine struct { + JobID string + JobInfo JobInfo + Line string + OperationComplete bool +} + +// AsyncProjectCommandOutputHandler is a handler to transport terraform client +// outputs to the front end. +type AsyncProjectCommandOutputHandler struct { + projectCmdOutput chan *ProjectCmdOutputLine + + projectOutputBuffers map[string]OutputBuffer + projectOutputBuffersLock sync.RWMutex + + receiverBuffers map[string]map[chan string]bool + receiverBuffersLock sync.RWMutex + + logger logging.SimpleLogging + + // Tracks all the jobs for a pull request which is used for clean up after a pull request is closed. + pullToJobMapping sync.Map +} + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_command_output_handler.go ProjectCommandOutputHandler + +type ProjectCommandOutputHandler interface { + // Send will enqueue the msg and wait for Handle() to receive the message. + Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) + + // Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting + // to read the channel in the same goroutine + Register(jobID string, receiver chan string) + + // Deregister removes a channel from successive updates and closes it. + Deregister(jobID string, receiver chan string) + + IsKeyExists(key string) bool + + // Listens for msg from channel + Handle() + + // Cleans up resources for a pull + CleanUp(pullInfo PullInfo) +} + +func NewAsyncProjectCommandOutputHandler( + projectCmdOutput chan *ProjectCmdOutputLine, + logger logging.SimpleLogging, +) ProjectCommandOutputHandler { + return &AsyncProjectCommandOutputHandler{ + projectCmdOutput: projectCmdOutput, + logger: logger, + receiverBuffers: map[string]map[chan string]bool{}, + projectOutputBuffers: map[string]OutputBuffer{}, + pullToJobMapping: sync.Map{}, + } +} + +func (p *AsyncProjectCommandOutputHandler) IsKeyExists(key string) bool { + p.projectOutputBuffersLock.RLock() + defer p.projectOutputBuffersLock.RUnlock() + _, ok := p.projectOutputBuffers[key] + return ok +} + +func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) { + p.projectCmdOutput <- &ProjectCmdOutputLine{ + JobID: ctx.JobID, + JobInfo: JobInfo{ + HeadCommit: ctx.Pull.HeadCommit, + PullInfo: PullInfo{ + PullNum: ctx.Pull.Num, + Repo: ctx.BaseRepo.Name, + ProjectName: ctx.ProjectName, + Workspace: ctx.Workspace, + }, + }, + Line: msg, + OperationComplete: operationComplete, + } +} + +func (p *AsyncProjectCommandOutputHandler) Register(jobID string, receiver chan string) { + p.addChan(receiver, jobID) +} + +func (p *AsyncProjectCommandOutputHandler) Handle() { + for msg := range p.projectCmdOutput { + if msg.OperationComplete { + p.completeJob(msg.JobID) + continue + } + + // Add job to pullToJob mapping + if _, ok := p.pullToJobMapping.Load(msg.JobInfo.PullInfo); !ok { + p.pullToJobMapping.Store(msg.JobInfo.PullInfo, map[string]bool{}) + } + value, _ := p.pullToJobMapping.Load(msg.JobInfo.PullInfo) + jobMapping := value.(map[string]bool) + jobMapping[msg.JobID] = true + + // Forward new message to all receiver channels and output buffer + p.writeLogLine(msg.JobID, msg.Line) + } +} + +func (p *AsyncProjectCommandOutputHandler) completeJob(jobID string) { + p.projectOutputBuffersLock.Lock() + p.receiverBuffersLock.Lock() + defer func() { + p.projectOutputBuffersLock.Unlock() + p.receiverBuffersLock.Unlock() + }() + + // Update operation status to complete + if outputBuffer, ok := p.projectOutputBuffers[jobID]; ok { + outputBuffer.OperationComplete = true + p.projectOutputBuffers[jobID] = outputBuffer + } + + // Close active receiver channels + if openChannels, ok := p.receiverBuffers[jobID]; ok { + for ch := range openChannels { + close(ch) + } + } + +} + +func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, jobID string) { + p.projectOutputBuffersLock.RLock() + outputBuffer := p.projectOutputBuffers[jobID] + p.projectOutputBuffersLock.RUnlock() + + for _, line := range outputBuffer.Buffer { + ch <- line + } + + // No need register receiver since all the logs have been streamed + if outputBuffer.OperationComplete { + close(ch) + return + } + + // add the channel to our registry after we backfill the contents of the buffer, + // to prevent new messages coming in interleaving with this backfill. + p.receiverBuffersLock.Lock() + if p.receiverBuffers[jobID] == nil { + p.receiverBuffers[jobID] = map[chan string]bool{} + } + p.receiverBuffers[jobID][ch] = true + p.receiverBuffersLock.Unlock() +} + +//Add log line to buffer and send to all current channels +func (p *AsyncProjectCommandOutputHandler) writeLogLine(jobID string, line string) { + p.receiverBuffersLock.Lock() + for ch := range p.receiverBuffers[jobID] { + select { + case ch <- line: + default: + // Delete buffered channel if it's blocking. + delete(p.receiverBuffers[jobID], ch) + } + } + p.receiverBuffersLock.Unlock() + + p.projectOutputBuffersLock.Lock() + if _, ok := p.projectOutputBuffers[jobID]; !ok { + p.projectOutputBuffers[jobID] = OutputBuffer{ + Buffer: []string{}, + } + } + outputBuffer := p.projectOutputBuffers[jobID] + outputBuffer.Buffer = append(outputBuffer.Buffer, line) + p.projectOutputBuffers[jobID] = outputBuffer + + p.projectOutputBuffersLock.Unlock() +} + +//Remove channel, so client no longer receives Terraform output +func (p *AsyncProjectCommandOutputHandler) Deregister(jobID string, ch chan string) { + p.logger.Debug("Removing channel for %s", jobID) + p.receiverBuffersLock.Lock() + delete(p.receiverBuffers[jobID], ch) + p.receiverBuffersLock.Unlock() +} + +func (p *AsyncProjectCommandOutputHandler) GetReceiverBufferForPull(jobID string) map[chan string]bool { + return p.receiverBuffers[jobID] +} + +func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) OutputBuffer { + return p.projectOutputBuffers[jobID] +} + +func (p *AsyncProjectCommandOutputHandler) GetJobIDMapForPull(pullInfo PullInfo) map[string]bool { + if value, ok := p.pullToJobMapping.Load(pullInfo); ok { + return value.(map[string]bool) + } + return nil +} + +func (p *AsyncProjectCommandOutputHandler) CleanUp(pullInfo PullInfo) { + if value, ok := p.pullToJobMapping.Load(pullInfo); ok { + jobMapping := value.(map[string]bool) + for jobID := range jobMapping { + p.projectOutputBuffersLock.Lock() + delete(p.projectOutputBuffers, jobID) + p.projectOutputBuffersLock.Unlock() + + p.receiverBuffersLock.Lock() + delete(p.receiverBuffers, jobID) + p.receiverBuffersLock.Unlock() + } + + // Remove job mapping + p.pullToJobMapping.Delete(pullInfo) + } +} + +// NoopProjectOutputHandler is a mock that doesn't do anything +type NoopProjectOutputHandler struct{} + +func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string, isOperationComplete bool) { +} + +func (p *NoopProjectOutputHandler) Register(jobID string, receiver chan string) {} +func (p *NoopProjectOutputHandler) Deregister(jobID string, receiver chan string) {} + +func (p *NoopProjectOutputHandler) Handle() { +} + +func (p *NoopProjectOutputHandler) CleanUp(pullInfo PullInfo) { +} + +func (p *NoopProjectOutputHandler) IsKeyExists(key string) bool { + return false +} diff --git a/server/jobs/project_command_output_handler_test.go b/server/jobs/project_command_output_handler_test.go new file mode 100644 index 0000000000..20a67b3a0a --- /dev/null +++ b/server/jobs/project_command_output_handler_test.go @@ -0,0 +1,250 @@ +package jobs_test + +import ( + "sync" + "testing" + "time" + + "github.com/runatlantis/atlantis/server/events/models" + "github.com/runatlantis/atlantis/server/jobs" + "github.com/runatlantis/atlantis/server/logging" + . "github.com/runatlantis/atlantis/testing" + "github.com/stretchr/testify/assert" +) + +func createTestProjectCmdContext(t *testing.T) models.ProjectCommandContext { + logger := logging.NewNoopLogger(t) + return models.ProjectCommandContext{ + BaseRepo: models.Repo{ + Name: "test-repo", + Owner: "test-org", + }, + HeadRepo: models.Repo{ + Name: "test-repo", + Owner: "test-org", + }, + Pull: models.PullRequest{ + Num: 1, + HeadBranch: "master", + BaseBranch: "master", + Author: "test-user", + HeadCommit: "234r232432", + }, + User: models.User{ + Username: "test-user", + }, + Log: logger, + Workspace: "myworkspace", + RepoRelDir: "test-dir", + ProjectName: "test-project", + JobID: "1234", + } +} + +func createProjectCommandOutputHandler(t *testing.T) jobs.ProjectCommandOutputHandler { + logger := logging.NewNoopLogger(t) + prjCmdOutputChan := make(chan *jobs.ProjectCmdOutputLine) + prjCmdOutputHandler := jobs.NewAsyncProjectCommandOutputHandler( + prjCmdOutputChan, + logger, + ) + + go func() { + prjCmdOutputHandler.Handle() + }() + + return prjCmdOutputHandler +} + +func TestProjectCommandOutputHandler(t *testing.T) { + Msg := "Test Terraform Output" + ctx := createTestProjectCmdContext(t) + + t.Run("receive message from main channel", func(t *testing.T) { + var wg sync.WaitGroup + var expectedMsg string + projectOutputHandler := createProjectCommandOutputHandler(t) + + ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + wg.Add(1) + + // read from channel + go func() { + for msg := range ch { + expectedMsg = msg + wg.Done() + } + }() + + projectOutputHandler.Send(ctx, Msg, false) + wg.Wait() + close(ch) + + // Wait for the msg to be read. + wg.Wait() + Equals(t, expectedMsg, Msg) + }) + + t.Run("copies buffer to new channels", func(t *testing.T) { + var wg sync.WaitGroup + + projectOutputHandler := createProjectCommandOutputHandler(t) + + // send first message to populated the buffer + projectOutputHandler.Send(ctx, Msg, false) + + ch := make(chan string) + + receivedMsgs := []string{} + + wg.Add(1) + // read from channel asynchronously + go func() { + for msg := range ch { + receivedMsgs = append(receivedMsgs, msg) + + // we're only expecting two messages here. + if len(receivedMsgs) >= 2 { + wg.Done() + } + } + }() + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + projectOutputHandler.Send(ctx, Msg, false) + wg.Wait() + close(ch) + + expectedMsgs := []string{Msg, Msg} + assert.Equal(t, len(expectedMsgs), len(receivedMsgs)) + for i := range expectedMsgs { + assert.Equal(t, expectedMsgs[i], receivedMsgs[i]) + } + }) + + t.Run("clean up all jobs when PR is closed", func(t *testing.T) { + var wg sync.WaitGroup + projectOutputHandler := createProjectCommandOutputHandler(t) + + ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + wg.Add(1) + + // read from channel + go func() { + for msg := range ch { + if msg == "Complete" { + wg.Done() + } + } + }() + + projectOutputHandler.Send(ctx, Msg, false) + projectOutputHandler.Send(ctx, "Complete", false) + + pullContext := jobs.PullInfo{ + PullNum: ctx.Pull.Num, + Repo: ctx.BaseRepo.Name, + ProjectName: ctx.ProjectName, + Workspace: ctx.Workspace, + } + projectOutputHandler.CleanUp(pullContext) + + // Check all the resources are cleaned up. + dfProjectOutputHandler, ok := projectOutputHandler.(*jobs.AsyncProjectCommandOutputHandler) + assert.True(t, ok) + + assert.Empty(t, dfProjectOutputHandler.GetProjectOutputBuffer(ctx.JobID)) + assert.Empty(t, dfProjectOutputHandler.GetReceiverBufferForPull(ctx.JobID)) + assert.Empty(t, dfProjectOutputHandler.GetJobIDMapForPull(pullContext)) + }) + + t.Run("mark operation status complete and close conn buffers for the job", func(t *testing.T) { + projectOutputHandler := createProjectCommandOutputHandler(t) + + ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + // read from channel + go func() { + for range ch { + } + }() + + projectOutputHandler.Send(ctx, Msg, false) + projectOutputHandler.Send(ctx, "", true) + + // Wait for the handler to process the message + time.Sleep(10 * time.Millisecond) + + dfProjectOutputHandler, ok := projectOutputHandler.(*jobs.AsyncProjectCommandOutputHandler) + assert.True(t, ok) + + outputBuffer := dfProjectOutputHandler.GetProjectOutputBuffer(ctx.JobID) + assert.True(t, outputBuffer.OperationComplete) + + _, ok = (<-ch) + assert.False(t, ok) + + }) + + t.Run("close conn buffer after streaming logs for completed operation", func(t *testing.T) { + projectOutputHandler := createProjectCommandOutputHandler(t) + + ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + // read from channel + go func() { + for range ch { + } + }() + + projectOutputHandler.Send(ctx, Msg, false) + projectOutputHandler.Send(ctx, "", true) + + // Wait for the handler to process the message + time.Sleep(10 * time.Millisecond) + + ch2 := make(chan string) + opComplete := make(chan bool) + + // buffer channel will be closed immediately after logs are streamed + go func() { + for range ch2 { + } + opComplete <- true + }() + + projectOutputHandler.Register(ctx.JobID, ch2) + + assert.True(t, <-opComplete) + }) +} diff --git a/server/router.go b/server/router.go index 33f4f2f4a9..58e2d800a5 100644 --- a/server/router.go +++ b/server/router.go @@ -3,7 +3,6 @@ package server import ( "fmt" "net/url" - "strings" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -41,17 +40,14 @@ func (r *Router) GenerateLockURL(lockID string) string { } func (r *Router) GenerateProjectJobURL(ctx models.ProjectCommandContext) (string, error) { - pull := ctx.Pull - projectIdentifier := models.GetProjectIdentifier(ctx.RepoRelDir, ctx.ProjectName) - jobURL, err := r.Underlying.Get(r.ProjectJobsViewRouteName).URL( - "org", strings.ReplaceAll(pull.BaseRepo.Owner, "/", "-"), - "repo", strings.ReplaceAll(pull.BaseRepo.Name, "/", "-"), // Account for nested repo names repo/sub-repo - "pull", fmt.Sprintf("%d", pull.Num), - "project", projectIdentifier, - "workspace", ctx.Workspace, + if ctx.JobID == "" { + return "", fmt.Errorf("no job id in ctx") + } + jobURL, err := r.Underlying.Get((r.ProjectJobsViewRouteName)).URL( + "job-id", ctx.JobID, ) if err != nil { - return "", errors.Wrapf(err, "creating job url for %s/%d/%s/%s", pull.BaseRepo.FullName, pull.Num, projectIdentifier, ctx.Workspace) + return "", errors.Wrapf(err, "creating job url for %s", ctx.JobID) } return r.AtlantisURL.String() + jobURL.String(), nil diff --git a/server/router_test.go b/server/router_test.go index 48bce1c8e9..f5af050848 100644 --- a/server/router_test.go +++ b/server/router_test.go @@ -1,13 +1,16 @@ package server_test import ( + "fmt" "net/http" "testing" + "github.com/google/uuid" "github.com/gorilla/mux" "github.com/runatlantis/atlantis/server" "github.com/runatlantis/atlantis/server/events/models" . "github.com/runatlantis/atlantis/testing" + "github.com/stretchr/testify/assert" ) func TestRouter_GenerateLockURL(t *testing.T) { @@ -67,7 +70,7 @@ func setupJobsRouter(t *testing.T) *server.Router { Ok(t, err) underlyingRouter := mux.NewRouter() - underlyingRouter.HandleFunc("/jobs/{org}/{repo}/{pull}/{project}/{workspace}", func(_ http.ResponseWriter, _ *http.Request) {}).Methods("GET").Name("project-jobs-detail") + underlyingRouter.HandleFunc("/jobs/{job-id}", func(_ http.ResponseWriter, _ *http.Request) {}).Methods("GET").Name("project-jobs-detail") return &server.Router{ AtlantisURL: atlantisURL, @@ -76,47 +79,20 @@ func setupJobsRouter(t *testing.T) *server.Router { } } -func TestGenerateProjectJobURL_ShouldGenerateURLWithProjectNameWhenProjectNameSpecified(t *testing.T) { +func TestGenerateProjectJobURL_ShouldGenerateURLWhenJobIDSpecified(t *testing.T) { router := setupJobsRouter(t) + jobID := uuid.New().String() ctx := models.ProjectCommandContext{ - Pull: models.PullRequest{ - BaseRepo: models.Repo{ - Owner: "test-owner", - Name: "test-repo", - }, - Num: 1, - }, - ProjectName: "test-project", - Workspace: "default", - } - expectedURL := "http://localhost:4141/jobs/test-owner/test-repo/1/test-project/default" - gotURL, err := router.GenerateProjectJobURL(ctx) - Ok(t, err) - - Equals(t, expectedURL, gotURL) -} - -func TestGenerateProjectJobURL_ShouldGenerateURLWithDirectoryAndWorkspaceWhenProjectNameNotSpecified(t *testing.T) { - router := setupJobsRouter(t) - ctx := models.ProjectCommandContext{ - Pull: models.PullRequest{ - BaseRepo: models.Repo{ - Owner: "test-owner", - Name: "test-repo", - }, - Num: 1, - }, - RepoRelDir: "ops/terraform/test-root", - Workspace: "default", + JobID: jobID, } - expectedURL := "http://localhost:4141/jobs/test-owner/test-repo/1/ops-terraform-test-root/default" + expectedURL := fmt.Sprintf("http://localhost:4141/jobs/%s", jobID) gotURL, err := router.GenerateProjectJobURL(ctx) Ok(t, err) Equals(t, expectedURL, gotURL) } -func TestGenerateProjectJobURL_ShouldGenerateURLWhenWorkingDirSetToBase(t *testing.T) { +func TestGenerateProjectJobURL_ShouldReturnErrorWhenJobIDNotSpecified(t *testing.T) { router := setupJobsRouter(t) ctx := models.ProjectCommandContext{ Pull: models.PullRequest{ @@ -126,52 +102,10 @@ func TestGenerateProjectJobURL_ShouldGenerateURLWhenWorkingDirSetToBase(t *testi }, Num: 1, }, - RepoRelDir: ".", - Workspace: "default", + RepoRelDir: "ops/terraform/", } - expectedURL := "http://localhost:4141/jobs/test-owner/test-repo/1/_/default" + expectedErrString := "no job id in ctx" gotURL, err := router.GenerateProjectJobURL(ctx) - Ok(t, err) - - Equals(t, expectedURL, gotURL) -} - -func TestGenerateProjectJobURL_ShouldGenerateURLWhenNestedRepo(t *testing.T) { - router := setupJobsRouter(t) - ctx := models.ProjectCommandContext{ - Pull: models.PullRequest{ - BaseRepo: models.Repo{ - Owner: "test-owner", - Name: "test-repo/sub-repo", - }, - Num: 1, - }, - RepoRelDir: "ops/terraform/test-root", - Workspace: "default", - } - expectedURL := "http://localhost:4141/jobs/test-owner/test-repo-sub-repo/1/ops-terraform-test-root/default" - gotURL, err := router.GenerateProjectJobURL(ctx) - Ok(t, err) - - Equals(t, expectedURL, gotURL) -} - -func TestGenerateProjectJobURL_ShouldGenerateURLWhenNestedOwner(t *testing.T) { - router := setupJobsRouter(t) - ctx := models.ProjectCommandContext{ - Pull: models.PullRequest{ - BaseRepo: models.Repo{ - Owner: "test-owner/sub-proj", - Name: "test-repo/sub-repo", - }, - Num: 1, - }, - RepoRelDir: "ops/terraform/test-root", - Workspace: "default", - } - expectedURL := "http://localhost:4141/jobs/test-owner-sub-proj/test-repo-sub-repo/1/ops-terraform-test-root/default" - gotURL, err := router.GenerateProjectJobURL(ctx) - Ok(t, err) - - Equals(t, expectedURL, gotURL) + assert.EqualError(t, err, expectedErrString) + Equals(t, "", gotURL) } diff --git a/server/server.go b/server/server.go index dda8c9e5d2..acccf559c3 100644 --- a/server/server.go +++ b/server/server.go @@ -33,7 +33,7 @@ import ( "github.com/mitchellh/go-homedir" "github.com/runatlantis/atlantis/server/core/config/valid" "github.com/runatlantis/atlantis/server/core/db" - "github.com/runatlantis/atlantis/server/handlers" + "github.com/runatlantis/atlantis/server/jobs" assetfs "github.com/elazarl/go-bindata-assetfs" "github.com/gorilla/mux" @@ -105,7 +105,7 @@ type Server struct { WebAuthentication bool WebUsername string WebPassword string - ProjectCmdOutputHandler handlers.ProjectCommandOutputHandler + ProjectCmdOutputHandler jobs.ProjectCommandOutputHandler } // Config holds config for server that isn't passed in by the user. @@ -310,17 +310,15 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { Underlying: underlyingRouter, } - var projectCmdOutputHandler handlers.ProjectCommandOutputHandler + var projectCmdOutputHandler jobs.ProjectCommandOutputHandler // When TFE is enabled log streaming is not necessary. if userConfig.TFEToken != "" { - projectCmdOutputHandler = &handlers.NoopProjectOutputHandler{} + projectCmdOutputHandler = &jobs.NoopProjectOutputHandler{} } else { - projectCmdOutput := make(chan *models.ProjectCmdOutputLine) - projectCmdOutputHandler = handlers.NewAsyncProjectCommandOutputHandler( + projectCmdOutput := make(chan *jobs.ProjectCmdOutputLine) + projectCmdOutputHandler = jobs.NewAsyncProjectCommandOutputHandler( projectCmdOutput, - commitStatusUpdater, - router, logger, ) } @@ -555,8 +553,9 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { } projectOutputWrapper := &events.ProjectOutputWrapper{ - ProjectCmdOutputHandler: projectCmdOutputHandler, - ProjectCommandRunner: projectCommandRunner, + JobMessageSender: projectCmdOutputHandler, + ProjectCommandRunner: projectCommandRunner, + JobURLSetter: jobs.NewJobURLSetter(router, commitStatusUpdater), } policyCheckCommandRunner := events.NewPolicyCheckCommandRunner( @@ -681,7 +680,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { wsMux := websocket.NewMultiplexor( logger, - controllers.ProjectInfoKeyGenerator{}, + controllers.JobIDKeyGenerator{}, projectCmdOutputHandler, ) @@ -693,6 +692,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { ProjectJobsErrorTemplate: templates.ProjectJobsErrorTemplate, Db: boltdb, WsMux: wsMux, + KeyGenerator: controllers.JobIDKeyGenerator{}, } eventsController := &events_controllers.VCSEventsController{ @@ -768,8 +768,8 @@ func (s *Server) Start() error { s.Router.HandleFunc("/locks", s.LocksController.DeleteLock).Methods("DELETE").Queries("id", "{id:.*}") s.Router.HandleFunc("/lock", s.LocksController.GetLock).Methods("GET"). Queries(LockViewRouteIDQueryParam, fmt.Sprintf("{%s}", LockViewRouteIDQueryParam)).Name(LockViewRouteName) - s.Router.HandleFunc("/jobs/{org}/{repo}/{pull}/{project}/{workspace}", s.JobsController.GetProjectJobs).Methods("GET").Name(ProjectJobsViewRouteName) - s.Router.HandleFunc("/jobs/{org}/{repo}/{pull}/{project}/{workspace}/ws", s.JobsController.GetProjectJobsWS).Methods("GET") + s.Router.HandleFunc("/jobs/{job-id}", s.JobsController.GetProjectJobs).Methods("GET").Name(ProjectJobsViewRouteName) + s.Router.HandleFunc("/jobs/{job-id}/ws", s.JobsController.GetProjectJobsWS).Methods("GET") n := negroni.New(&negroni.Recovery{ Logger: log.New(os.Stdout, "", log.LstdFlags),