Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor job controller and websocket logic. #150

Merged
merged 11 commits into from
Dec 2, 2021
2 changes: 1 addition & 1 deletion server/controllers/events/events_controller_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
"github.com/runatlantis/atlantis/server/events/webhooks"
"github.com/runatlantis/atlantis/server/events/yaml"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"strconv"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
stats "github.com/lyft/gostats"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/controllers/templates"
"github.com/runatlantis/atlantis/server/controllers/websocket"
"github.com/runatlantis/atlantis/server/core/db"
"github.com/runatlantis/atlantis/server/events/metrics"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
)

Expand All @@ -25,10 +25,20 @@ type JobsController struct {
ProjectJobsTemplate templates.TemplateWriter
ProjectJobsErrorTemplate templates.TemplateWriter
Db *db.BoltDB
WsMux *websocket.Multiplexor
StatsScope stats.Scope
}

type ProjectInfoKeyGenerator struct{}

func (g ProjectInfoKeyGenerator) Generate(r *http.Request) (string, error) {
projectInfo, err := newProjectInfo(r)

if err != nil {
return "", errors.Wrap(err, "creating project info")
}

WebsocketHandler handlers.WebsocketHandler
ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler
StatsScope stats.Scope
return projectInfo.String(), nil
}

type pullInfo struct {
Expand Down Expand Up @@ -132,37 +142,9 @@ func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request)
}

func (j *JobsController) getProjectJobsWS(w http.ResponseWriter, r *http.Request) error {
projectInfo, err := newProjectInfo(r)
if err != nil {
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return err
}

c, err := j.WebsocketHandler.Upgrade(w, r, nil)
if err != nil {
j.Logger.Warn("Failed to upgrade websocket: %s", err)
return err
}

// Buffer size set to 1000 to ensure messages get queued (upto 1000) if the receiverCh is not ready to
// receive messages before the channel is closed and resources cleaned up.
receiver := make(chan string, 1000)
j.WebsocketHandler.SetCloseHandler(c, receiver)

// Add a reader goroutine to listen for socket.close() events.
go j.WebsocketHandler.SetReadHandler(c)

pull := projectInfo.String()
err = j.ProjectCommandOutputHandler.Receive(pull, receiver, func(msg string) error {
if err := c.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
j.Logger.Warn("Failed to write ws message: %s", err)
return err
}
return nil
})
err := j.WsMux.Handle(w, r)

if err != nil {
j.Logger.Warn("Failed to receive message: %s", err)
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return err
}
Expand Down
47 changes: 0 additions & 47 deletions server/controllers/logstreaming_controller_test.go

This file was deleted.

63 changes: 63 additions & 0 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package websocket

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/logging"
)

// PartitionKeyGenerator generates partition keys for the multiplexor
type PartitionKeyGenerator interface {
Generate(r *http.Request) (string, error)
}

// PartitionRegistry is the registry holding each partition
// and is responsible for registering/deregistering new buffers
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
}

// Multiplexor is responsible for handling the data transfer between the storage layer
// and the registry. Note this is still a WIP as right now the registry is assumed to handle
// everything.
type Multiplexor struct {
writer *Writer
keyGenerator PartitionKeyGenerator
registry PartitionRegistry
}

func NewMultiplexor(log logging.SimpleLogging, keyGenerator PartitionKeyGenerator, registry PartitionRegistry) *Multiplexor {
upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
return &Multiplexor{
writer: &Writer{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the writer.NewWriter() method instead of instantiating the upgrader here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that method is unused. I'll remove it.

upgrader: upgrader,
log: log,
},
keyGenerator: keyGenerator,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to assign the registry to the multiplexor? Don't we have a linter that checks for these kind of errors where the passed argument is not used in the method body?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, classic. i don't think we have linters for that. Will fix.

registry: registry,
}
}

// Handle should be called for a given websocket request. It blocks
// while writing to the websocket until the buffer is closed.
func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
key, err := m.keyGenerator.Generate(r)

if err != nil {
return errors.Wrapf(err, "generating partition key")
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)

// spinning up a goroutine for this since we are attempting to block on the read side.
go m.registry.Register(key, buffer)
defer m.registry.Deregister(key, buffer)

return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key)
}
70 changes: 70 additions & 0 deletions server/controllers/websocket/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package websocket

import (
"net/http"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/runatlantis/atlantis/server/logging"
)

func NewWriter(log logging.SimpleLogging) *Writer {
upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
return &Writer{
upgrader: upgrader,
log: log,
}
}

type Writer struct {
upgrader websocket.Upgrader

//TODO: Remove dependency on atlantis logger here if we upstream this.
log logging.SimpleLogging
}

func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan string) error {
conn, err := w.upgrader.Upgrade(rw, r, nil)

if err != nil {
return errors.Wrap(err, "upgrading websocket connection")
}

conn.SetCloseHandler(func(code int, text string) error {
// Close the channnel after websocket connection closed.
// Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup.
// is it good practice to close at the receiver? Probably not, we should figure out a better
// way to handle this case
close(input)
return nil
})

// Add a reader goroutine to listen for socket.close() events.
go w.setReadHandler(conn)

// block on reading our input channel
for msg := range input {
if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
w.log.Warn("Failed to write ws message: %s", err)
return err
}
}

return nil
}

func (w *Writer) setReadHandler(c *websocket.Conn) {
for {
_, _, err := c.ReadMessage()
if err != nil {
// CloseGoingAway (1001) when a browser tab is closed.
// Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
w.log.Warn("Failed to read WS message: %s", err)
}
return
}
}

}
2 changes: 1 addition & 1 deletion server/core/terraform/terraform_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (

"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/terraform/ansi"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
)

var LogStreamingValidCmds = [...]string{"init", "plan", "apply"}
Expand Down
4 changes: 2 additions & 2 deletions server/core/terraform/terraform_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
version "github.com/hashicorp/go-version"
. "github.com/petergtz/pegomock"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
4 changes: 2 additions & 2 deletions server/core/terraform/terraform_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/runatlantis/atlantis/server/core/terraform"
"github.com/runatlantis/atlantis/server/core/terraform/mocks"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
2 changes: 1 addition & 1 deletion server/events/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/vcs"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
"github.com/runatlantis/atlantis/server/recovery"
gitlab "github.com/xanzy/go-gitlab"
)
Expand Down
4 changes: 2 additions & 2 deletions server/events/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/models/fixtures"
vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
2 changes: 1 addition & 1 deletion server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/webhooks"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
)

// DirNotExistErr is an error caused by the directory not existing.
Expand Down
4 changes: 2 additions & 2 deletions server/events/project_command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/runatlantis/atlantis/server/events/mocks/matchers"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/yaml/valid"
"github.com/runatlantis/atlantis/server/feature"
fmocks "github.com/runatlantis/atlantis/server/feature/mocks"
handlermocks "github.com/runatlantis/atlantis/server/handlers/mocks"
"github.com/runatlantis/atlantis/server/logging"
"github.com/runatlantis/atlantis/server/lyft/feature"
fmocks "github.com/runatlantis/atlantis/server/lyft/feature/mocks"
. "github.com/runatlantis/atlantis/testing"
)

Expand Down
Loading