Skip to content

Commit

Permalink
feat: Use UUIDs to identify log streaming jobs (#2051)
Browse files Browse the repository at this point in the history
* Add UUID for Log Streaming Job ID (#167)

* Update log handler to close buffered  channels when an operation is complete (#170)

* Add preliminary check before registering new receivers in the log handler (#173)

* Using projectOutputBuffers to check for jobID instead of receiverBuffers (#181)

* Refactor log handler  (#175)

* Reverting go.mod and go.sum

* Fix lint errors

* Fix linting
  • Loading branch information
Aayyush authored Feb 9, 2022
1 parent bbd539b commit 58e9b42
Show file tree
Hide file tree
Showing 45 changed files with 1,213 additions and 1,111 deletions.
4 changes: 2 additions & 2 deletions server/controllers/events/events_controller_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/runatlantis/atlantis/server/events/vcs"
vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks"
"github.com/runatlantis/atlantis/server/events/webhooks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"
. "github.com/runatlantis/atlantis/testing"
)
Expand Down Expand Up @@ -836,7 +836,7 @@ func setupE2E(t *testing.T, repoDir string) (events_controllers.VCSEventsControl
e2eStatusUpdater := &events.DefaultCommitStatusUpdater{Client: e2eVCSClient}
e2eGithubGetter := mocks.NewMockGithubPullGetter()
e2eGitlabGetter := mocks.NewMockGitlabMergeRequestGetter()
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

// Real dependencies.
logger := logging.NewNoopLogger(t)
Expand Down
110 changes: 18 additions & 92 deletions server/controllers/jobs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ import (
"net/http"
"net/url"

"strconv"

"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/controllers/templates"
"github.com/runatlantis/atlantis/server/controllers/websocket"
"github.com/runatlantis/atlantis/server/core/db"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/logging"
)

type JobIDKeyGenerator struct{}

func (g JobIDKeyGenerator) Generate(r *http.Request) (string, error) {
jobID, ok := mux.Vars(r)["job-id"]
if !ok {
return "", fmt.Errorf("internal error: no job-id in route")
}

return jobID, nil
}

type JobsController struct {
AtlantisVersion string
AtlantisURL *url.URL
Expand All @@ -24,105 +31,24 @@ type JobsController struct {
ProjectJobsErrorTemplate templates.TemplateWriter
Db *db.BoltDB
WsMux *websocket.Multiplexor
}

type ProjectInfoKeyGenerator struct{}

func (g ProjectInfoKeyGenerator) Generate(r *http.Request) (string, error) {
projectInfo, err := newProjectInfo(r)

if err != nil {
return "", errors.Wrap(err, "creating project info")
}

return projectInfo.String(), nil
}

type pullInfo struct {
org string
repo string
pull int
}

func (p *pullInfo) String() string {
return fmt.Sprintf("%s/%s/%d", p.org, p.repo, p.pull)
}

type projectInfo struct {
projectName string
workspace string
pullInfo
}

func (p *projectInfo) String() string {
return fmt.Sprintf("%s/%s/%d/%s/%s", p.org, p.repo, p.pull, p.projectName, p.workspace)
}

func newPullInfo(r *http.Request) (*pullInfo, error) {
org, ok := mux.Vars(r)["org"]
if !ok {
return nil, fmt.Errorf("Internal error: no org in route")
}
repo, ok := mux.Vars(r)["repo"]
if !ok {
return nil, fmt.Errorf("Internal error: no repo in route")
}
pull, ok := mux.Vars(r)["pull"]
if !ok {
return nil, fmt.Errorf("Internal error: no pull in route")
}
pullNum, err := strconv.Atoi(pull)
if err != nil {
return nil, err
}

return &pullInfo{
org: org,
repo: repo,
pull: pullNum,
}, nil
}

// Gets the PR information from the HTTP request params
func newProjectInfo(r *http.Request) (*projectInfo, error) {
pullInfo, err := newPullInfo(r)
if err != nil {
return nil, err
}

project, ok := mux.Vars(r)["project"]
if !ok {
return nil, fmt.Errorf("Internal error: no project in route")
}

workspace, ok := mux.Vars(r)["workspace"]
if !ok {
return nil, fmt.Errorf("Internal error: no workspace in route")
}

return &projectInfo{
pullInfo: *pullInfo,
projectName: project,
workspace: workspace,
}, nil
KeyGenerator JobIDKeyGenerator
}

func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request) {
projectInfo, err := newProjectInfo(r)
jobID, err := j.KeyGenerator.Generate(r)

if err != nil {
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
j.respond(w, logging.Error, http.StatusBadRequest, err.Error())
return
}

viewData := templates.ProjectJobData{
AtlantisVersion: j.AtlantisVersion,
ProjectPath: projectInfo.String(),
ProjectPath: jobID,
CleanedBasePath: j.AtlantisURL.Path,
ClearMsg: models.LogStreamingClearMsg,
}

err = j.ProjectJobsTemplate.Execute(w, viewData)
if err != nil {
if err = j.ProjectJobsTemplate.Execute(w, viewData); err != nil {
j.Logger.Err(err.Error())
}
}
Expand All @@ -131,7 +57,7 @@ func (j *JobsController) GetProjectJobsWS(w http.ResponseWriter, r *http.Request
err := j.WsMux.Handle(w, r)

if err != nil {
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
j.respond(w, logging.Error, http.StatusBadRequest, err.Error())
return
}
}
Expand Down
11 changes: 3 additions & 8 deletions server/controllers/templates/web_templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ type ProjectJobData struct {
AtlantisVersion string
ProjectPath string
CleanedBasePath string
ClearMsg string
}

var ProjectJobsTemplate = template.Must(template.New("blank.html.tmpl").Parse(`
Expand Down Expand Up @@ -417,13 +416,9 @@ var ProjectJobsTemplate = template.Must(template.New("blank.html.tmpl").Parse(`
document.location.host +
document.location.pathname +
"/ws");
socket.onmessage = function(event) {
var msg = String.fromCharCode.apply(null, new Uint8Array(event.data))
if (msg.trim() === "-----Starting New Process-----") {
term.clear()
return
}
}
window.addEventListener("unload", function(event) {
websocket.close();
})
var attachAddon = new AttachAddon.AttachAddon(socket);
var fitAddon = new FitAddon.FitAddon();
term.loadAddon(attachAddon);
Expand Down
7 changes: 7 additions & 0 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package websocket

import (
"fmt"
"net/http"

"github.com/gorilla/websocket"
Expand All @@ -18,6 +19,7 @@ type PartitionKeyGenerator interface {
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
IsKeyExists(key string) bool
}

// Multiplexor is responsible for handling the data transfer between the storage layer
Expand Down Expand Up @@ -51,6 +53,11 @@ func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
return errors.Wrapf(err, "generating partition key")
}

// check if the job ID exists before registering receiver
if !m.registry.IsKeyExists(key) {
return fmt.Errorf("invalid key: %s", key)
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)
Expand Down
31 changes: 4 additions & 27 deletions server/controllers/websocket/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin
return errors.Wrap(err, "upgrading websocket connection")
}

conn.SetCloseHandler(func(code int, text string) error {
// Close the channnel after websocket connection closed.
// Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup.
// is it good practice to close at the receiver? Probably not, we should figure out a better
// way to handle this case
close(input)
return nil
})

// Add a reader goroutine to listen for socket.close() events.
go w.setReadHandler(conn)

// block on reading our input channel
for msg := range input {
if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
Expand All @@ -49,20 +37,9 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin
}
}

return nil
}

func (w *Writer) setReadHandler(c *websocket.Conn) {
for {
_, _, err := c.ReadMessage()
if err != nil {
// CloseGoingAway (1001) when a browser tab is closed.
// Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
w.log.Warn("Failed to read WS message: %s", err)
}
return
}
// close ws conn after input channel is closed
if err = conn.Close(); err != nil {
w.log.Warn("Failed to close ws connection: %s", err)
}

return nil
}
14 changes: 7 additions & 7 deletions server/core/terraform/terraform_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/terraform/ansi"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/jobs"
"github.com/runatlantis/atlantis/server/logging"
)

Expand Down Expand Up @@ -79,7 +79,7 @@ type DefaultClient struct {
// usePluginCache determines whether or not to set the TF_PLUGIN_CACHE_DIR env var
usePluginCache bool

projectCmdOutputHandler handlers.ProjectCommandOutputHandler
projectCmdOutputHandler jobs.ProjectCommandOutputHandler
}

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_downloader.go Downloader
Expand Down Expand Up @@ -111,7 +111,7 @@ func NewClientWithDefaultVersion(
tfDownloader Downloader,
usePluginCache bool,
fetchAsync bool,
projectCmdOutputHandler handlers.ProjectCommandOutputHandler,
projectCmdOutputHandler jobs.ProjectCommandOutputHandler,
) (*DefaultClient, error) {
var finalDefaultVersion *version.Version
var localVersion *version.Version
Expand Down Expand Up @@ -194,7 +194,7 @@ func NewTestClient(
tfDownloadURL string,
tfDownloader Downloader,
usePluginCache bool,
projectCmdOutputHandler handlers.ProjectCommandOutputHandler,
projectCmdOutputHandler jobs.ProjectCommandOutputHandler,
) (*DefaultClient, error) {
return NewClientWithDefaultVersion(
log,
Expand Down Expand Up @@ -231,7 +231,7 @@ func NewClient(
tfDownloadURL string,
tfDownloader Downloader,
usePluginCache bool,
projectCmdOutputHandler handlers.ProjectCommandOutputHandler,
projectCmdOutputHandler jobs.ProjectCommandOutputHandler,
) (*DefaultClient, error) {
return NewClientWithDefaultVersion(
log,
Expand Down Expand Up @@ -437,7 +437,7 @@ func (c *DefaultClient) RunCommandAsync(ctx models.ProjectCommandContext, path s
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message)
c.projectCmdOutputHandler.Send(ctx, message, false)
}
wg.Done()
}()
Expand All @@ -446,7 +446,7 @@ func (c *DefaultClient) RunCommandAsync(ctx models.ProjectCommandContext, path s
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message)
c.projectCmdOutputHandler.Send(ctx, message, false)
}
wg.Done()
}()
Expand Down
16 changes: 8 additions & 8 deletions server/core/terraform/terraform_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

version "github.com/hashicorp/go-version"
"github.com/runatlantis/atlantis/server/events/models"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
"github.com/runatlantis/atlantis/server/logging"
. "github.com/runatlantis/atlantis/testing"
)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestDefaultClient_RunCommandWithVersion_EnvVars(t *testing.T) {
Ok(t, err)
tmp, cleanup := TempDir(t)
logger := logging.NewNoopLogger(t)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

ctx := models.ProjectCommandContext{
Log: logger,
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestDefaultClient_RunCommandWithVersion_Error(t *testing.T) {
Ok(t, err)
tmp, cleanup := TempDir(t)
logger := logging.NewNoopLogger(t)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

ctx := models.ProjectCommandContext{
Log: logger,
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestDefaultClient_RunCommandAsync_Success(t *testing.T) {
Ok(t, err)
tmp, cleanup := TempDir(t)
logger := logging.NewNoopLogger(t)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

ctx := models.ProjectCommandContext{
Log: logger,
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestDefaultClient_RunCommandAsync_BigOutput(t *testing.T) {
Ok(t, err)
tmp, cleanup := TempDir(t)
logger := logging.NewNoopLogger(t)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

ctx := models.ProjectCommandContext{
Log: logger,
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestDefaultClient_RunCommandAsync_StderrOutput(t *testing.T) {
Ok(t, err)
tmp, cleanup := TempDir(t)
logger := logging.NewNoopLogger(t)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

ctx := models.ProjectCommandContext{
Log: logger,
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestDefaultClient_RunCommandAsync_ExitOne(t *testing.T) {
Ok(t, err)
tmp, cleanup := TempDir(t)
logger := logging.NewNoopLogger(t)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

ctx := models.ProjectCommandContext{
Log: logger,
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestDefaultClient_RunCommandAsync_Input(t *testing.T) {
Ok(t, err)
tmp, cleanup := TempDir(t)
logger := logging.NewNoopLogger(t)
projectCmdOutputHandler := handlermocks.NewMockProjectCommandOutputHandler()
projectCmdOutputHandler := jobmocks.NewMockProjectCommandOutputHandler()

ctx := models.ProjectCommandContext{
Log: logger,
Expand Down
Loading

0 comments on commit 58e9b42

Please sign in to comment.