Skip to content

Commit

Permalink
Refactor job controller and websocket logic. (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
nishkrishnan authored and Aayyush committed Dec 9, 2021
1 parent 4370d14 commit 5c1acd9
Show file tree
Hide file tree
Showing 27 changed files with 556 additions and 1,048 deletions.
2 changes: 1 addition & 1 deletion server/controllers/events/events_controller_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
"github.com/runatlantis/atlantis/server/events/webhooks"
"github.com/runatlantis/atlantis/server/events/yaml"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"strconv"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
stats "github.com/lyft/gostats"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/controllers/templates"
"github.com/runatlantis/atlantis/server/controllers/websocket"
"github.com/runatlantis/atlantis/server/core/db"
"github.com/runatlantis/atlantis/server/events/metrics"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
)

Expand All @@ -25,10 +25,20 @@ type JobsController struct {
ProjectJobsTemplate templates.TemplateWriter
ProjectJobsErrorTemplate templates.TemplateWriter
Db *db.BoltDB
WsMux *websocket.Multiplexor
StatsScope stats.Scope
}

type ProjectInfoKeyGenerator struct{}

func (g ProjectInfoKeyGenerator) Generate(r *http.Request) (string, error) {
projectInfo, err := newProjectInfo(r)

if err != nil {
return "", errors.Wrap(err, "creating project info")
}

WebsocketHandler handlers.WebsocketHandler
ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler
StatsScope stats.Scope
return projectInfo.String(), nil
}

type pullInfo struct {
Expand Down Expand Up @@ -132,37 +142,9 @@ func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request)
}

func (j *JobsController) getProjectJobsWS(w http.ResponseWriter, r *http.Request) error {
projectInfo, err := newProjectInfo(r)
if err != nil {
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return err
}

c, err := j.WebsocketHandler.Upgrade(w, r, nil)
if err != nil {
j.Logger.Warn("Failed to upgrade websocket: %s", err)
return err
}

// Buffer size set to 1000 to ensure messages get queued (upto 1000) if the receiverCh is not ready to
// receive messages before the channel is closed and resources cleaned up.
receiver := make(chan string, 1000)
j.WebsocketHandler.SetCloseHandler(c, receiver)

// Add a reader goroutine to listen for socket.close() events.
go j.WebsocketHandler.SetReadHandler(c)

pull := projectInfo.String()
err = j.ProjectCommandOutputHandler.Receive(pull, receiver, func(msg string) error {
if err := c.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
j.Logger.Warn("Failed to write ws message: %s", err)
return err
}
return nil
})
err := j.WsMux.Handle(w, r)

if err != nil {
j.Logger.Warn("Failed to receive message: %s", err)
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return err
}
Expand Down
45 changes: 0 additions & 45 deletions server/controllers/logstreaming_controller_test.go

This file was deleted.

63 changes: 63 additions & 0 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package websocket

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/logging"
)

// PartitionKeyGenerator generates partition keys for the multiplexor
type PartitionKeyGenerator interface {
Generate(r *http.Request) (string, error)
}

// PartitionRegistry is the registry holding each partition
// and is responsible for registering/deregistering new buffers
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
}

// Multiplexor is responsible for handling the data transfer between the storage layer
// and the registry. Note this is still a WIP as right now the registry is assumed to handle
// everything.
type Multiplexor struct {
writer *Writer
keyGenerator PartitionKeyGenerator
registry PartitionRegistry
}

func NewMultiplexor(log logging.SimpleLogging, keyGenerator PartitionKeyGenerator, registry PartitionRegistry) *Multiplexor {
upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
return &Multiplexor{
writer: &Writer{
upgrader: upgrader,
log: log,
},
keyGenerator: keyGenerator,
registry: registry,
}
}

// Handle should be called for a given websocket request. It blocks
// while writing to the websocket until the buffer is closed.
func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
key, err := m.keyGenerator.Generate(r)

if err != nil {
return errors.Wrapf(err, "generating partition key")
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)

// spinning up a goroutine for this since we are attempting to block on the read side.
go m.registry.Register(key, buffer)
defer m.registry.Deregister(key, buffer)

return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key)
}
70 changes: 70 additions & 0 deletions server/controllers/websocket/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package websocket

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/logging"
)

func NewWriter(log logging.SimpleLogging) *Writer {
upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
return &Writer{
upgrader: upgrader,
log: log,
}
}

type Writer struct {
upgrader websocket.Upgrader

//TODO: Remove dependency on atlantis logger here if we upstream this.
log logging.SimpleLogging
}

func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan string) error {
conn, err := w.upgrader.Upgrade(rw, r, nil)

if err != nil {
return errors.Wrap(err, "upgrading websocket connection")
}

conn.SetCloseHandler(func(code int, text string) error {
// Close the channnel after websocket connection closed.
// Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup.
// is it good practice to close at the receiver? Probably not, we should figure out a better
// way to handle this case
close(input)
return nil
})

// Add a reader goroutine to listen for socket.close() events.
go w.setReadHandler(conn)

// block on reading our input channel
for msg := range input {
if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
w.log.Warn("Failed to write ws message: %s", err)
return err
}
}

return nil
}

func (w *Writer) setReadHandler(c *websocket.Conn) {
for {
_, _, err := c.ReadMessage()
if err != nil {
// CloseGoingAway (1001) when a browser tab is closed.
// Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
w.log.Warn("Failed to read WS message: %s", err)
}
return
}
}

}
2 changes: 1 addition & 1 deletion server/core/terraform/terraform_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (

"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/terraform/ansi"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
)

var LogStreamingValidCmds = [...]string{"init", "plan", "apply"}
Expand Down
4 changes: 2 additions & 2 deletions server/core/terraform/terraform_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
version "github.com/hashicorp/go-version"
. "github.com/petergtz/pegomock"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
4 changes: 2 additions & 2 deletions server/core/terraform/terraform_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/runatlantis/atlantis/server/core/terraform"
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
1 change: 1 addition & 0 deletions server/events/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/runatlantis/atlantis/server/events/vcs"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
"github.com/runatlantis/atlantis/server/recovery"
gitlab "github.com/xanzy/go-gitlab"
)
Expand Down
43 changes: 43 additions & 0 deletions server/events/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/models/fixtures"
vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand All @@ -46,6 +48,7 @@ var azuredevopsGetter *mocks.MockAzureDevopsPullGetter
var githubGetter *mocks.MockGithubPullGetter
var gitlabGetter *mocks.MockGitlabMergeRequestGetter
var ch events.DefaultCommandRunner
var fa events.FeatureAwareCommandRunner
var workingDir events.WorkingDir
var pendingPlanFinder *mocks.MockPendingPlanFinder
var drainer *events.Drainer
Expand Down Expand Up @@ -382,6 +385,46 @@ func TestRunCommentCommand_DisableApplyAllDisabled(t *testing.T) {
vcsClient.VerifyWasCalledOnce().CreateComment(fixtures.GithubRepo, modelPull.Num, "**Error:** Running `atlantis apply` without flags is disabled. You must specify which project to apply via the `-d <dir>`, `-w <workspace>` or `-p <project name>` flags.", "apply")
}

func TestFeatureAwareRunCommentCommandRunner_CommentWhenEnabled(t *testing.T) {
t.Log("if \"atlantis apply --force\" is run and this is enabled atlantis should" +
" comment with a warning")
vcsClient := setup(t)
allocator := fmocks.NewMockAllocator()

fa = events.FeatureAwareCommandRunner{
CommandRunner: &ch,
VCSClient: vcsClient,
Logger: logger,
FeatureAllocator: allocator,
}

modelPull := models.PullRequest{BaseRepo: fixtures.GithubRepo, State: models.OpenPullState, Num: fixtures.Pull.Num}
When(allocator.ShouldAllocate(feature.ForceApply, "runatlantis/atlantis")).ThenReturn(true, nil)

fa.RunCommentCommand(fixtures.GithubRepo, nil, nil, fixtures.User, modelPull.Num, &events.CommentCommand{Name: models.ApplyCommand, ForceApply: true})
vcsClient.VerifyWasCalledOnce().CreateComment(fixtures.GithubRepo, modelPull.Num, "⚠️ WARNING ⚠️\n\n You have bypassed all apply requirements for this PR 🚀 . This can have unpredictable consequences 🙏🏽 and should only be used in an emergency 🆘 .\n\n 𝐓𝐡𝐢𝐬 𝐚𝐜𝐭𝐢𝐨𝐧 𝐰𝐢𝐥𝐥 𝐛𝐞 𝐚𝐮𝐝𝐢𝐭𝐞𝐝.\n", "")
}

func TestFeatureAwareRunCommentCommandRunner_NoCommentWhenDisabled(t *testing.T) {
t.Log("if \"atlantis apply --force\" is run and this is disabled atlantis should" +
" not comment with a warning")
vcsClient := setup(t)
allocator := fmocks.NewMockAllocator()

fa = events.FeatureAwareCommandRunner{
CommandRunner: &ch,
VCSClient: vcsClient,
Logger: logger,
FeatureAllocator: allocator,
}

modelPull := models.PullRequest{BaseRepo: fixtures.GithubRepo, State: models.OpenPullState, Num: fixtures.Pull.Num}
When(allocator.ShouldAllocate(feature.ForceApply, "runatlantis/atlantis")).ThenReturn(false, nil)

fa.RunCommentCommand(fixtures.GithubRepo, nil, nil, fixtures.User, modelPull.Num, &events.CommentCommand{Name: models.ApplyCommand, ForceApply: true})
vcsClient.VerifyWasCalled(Never()).CreateComment(fixtures.GithubRepo, modelPull.Num, "⚠️ WARNING ⚠️\n\n You have bypassed all apply requirements for this PR 🚀 . This can have unpredictable consequences 🙏🏽 and should only be used in an emergency 🆘 .\n\n 𝐓𝐡𝐢𝐬 𝐚𝐜𝐭𝐢𝐨𝐧 𝐰𝐢𝐥𝐥 𝐛𝐞 𝐚𝐮𝐝𝐢𝐭𝐞𝐝.\n", "")
}

func TestRunCommentCommand_DisableDisableAutoplan(t *testing.T) {
t.Log("if \"DisableAutoplan is true\" are disabled and we are silencing return and do not comment with error")
setup(t)
Expand Down
1 change: 1 addition & 0 deletions server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
)

// DirNotExistErr is an error caused by the directory not existing.
Expand Down
2 changes: 2 additions & 0 deletions server/events/project_command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/runatlantis/atlantis/server/events/yaml/valid"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
Loading

0 comments on commit 5c1acd9

Please sign in to comment.