Skip to content

Commit

Permalink
Update log handler to close buffered channels when an operation is co…
Browse files Browse the repository at this point in the history
…mplete (#170)
  • Loading branch information
Aayyush authored Feb 2, 2022
1 parent 7f2d9bf commit 233ca5d
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 61 deletions.
31 changes: 4 additions & 27 deletions server/controllers/websocket/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin
return errors.Wrap(err, "upgrading websocket connection")
}

conn.SetCloseHandler(func(code int, text string) error {
// Close the channnel after websocket connection closed.
// Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup.
// is it good practice to close at the receiver? Probably not, we should figure out a better
// way to handle this case
close(input)
return nil
})

// Add a reader goroutine to listen for socket.close() events.
go w.setReadHandler(conn)

// block on reading our input channel
for msg := range input {
if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil {
Expand All @@ -51,20 +39,9 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin
}
}

return nil
}

func (w *Writer) setReadHandler(c *websocket.Conn) {
for {
_, _, err := c.ReadMessage()
if err != nil {
// CloseGoingAway (1001) when a browser tab is closed.
// Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
w.log.Warn("Failed to read WS message: %s", err)
}
return
}
// close ws conn after input channel is closed
if err = conn.Close(); err != nil {
w.log.Warn("Failed to close ws connection: %s", err)
}

return nil
}
4 changes: 2 additions & 2 deletions server/core/terraform/async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message)
c.projectCmdOutputHandler.Send(ctx, message, false)
}
wg.Done()
}()
Expand All @@ -102,7 +102,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message)
c.projectCmdOutputHandler.Send(ctx, message, false)
}
wg.Done()
}()
Expand Down
12 changes: 8 additions & 4 deletions server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/runatlantis/atlantis/server/lyft/feature"
)

const OperationComplete = true

// DirNotExistErr is an error caused by the directory not existing.
type DirNotExistErr struct {
RepoRelDir string
Expand Down Expand Up @@ -124,13 +126,15 @@ type ProjectOutputWrapper struct {
}

func (p *ProjectOutputWrapper) Plan(ctx models.ProjectCommandContext) models.ProjectResult {
// Reset the buffer when running the plan. We only need to do this for plan,
// apply is a continuation of the same workflow
return p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan)
result := p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan)
p.ProjectCmdOutputHandler.Send(ctx, "", OperationComplete)
return result
}

func (p *ProjectOutputWrapper) Apply(ctx models.ProjectCommandContext) models.ProjectResult {
return p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply)
result := p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply)
p.ProjectCmdOutputHandler.Send(ctx, "", OperationComplete)
return result
}

func (p *ProjectOutputWrapper) updateProjectPRStatus(commandName models.CommandName, ctx models.ProjectCommandContext, execute func(ctx models.ProjectCommandContext) models.ProjectResult) models.ProjectResult {
Expand Down
2 changes: 1 addition & 1 deletion server/events/pull_closed_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestCleanUpLogStreaming(t *testing.T) {
}

go prjCmdOutHandler.Handle()
prjCmdOutHandler.Send(ctx, "Test Message")
prjCmdOutHandler.Send(ctx, "Test Message", false)

// Create boltdb and add pull request.
var lockBucket = "bucket"
Expand Down
20 changes: 12 additions & 8 deletions server/handlers/mocks/mock_project_command_output_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 60 additions & 13 deletions server/handlers/project_command_output_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/runatlantis/atlantis/server/logging"
)

type OutputBuffer struct {
OperationComplete bool
Buffer []string
}

type PullContext struct {
PullNum int
Repo string
Expand All @@ -24,15 +29,16 @@ type ProjectCmdOutputLine struct {

JobContext JobContext

Line string
Line string
OperationComplete bool
}

// AsyncProjectCommandOutputHandler is a handler to transport terraform client
// outputs to the front end.
type AsyncProjectCommandOutputHandler struct {
projectCmdOutput chan *ProjectCmdOutputLine

projectOutputBuffers map[string][]string
projectOutputBuffers map[string]OutputBuffer
projectOutputBuffersLock sync.RWMutex

receiverBuffers map[string]map[chan string]bool
Expand Down Expand Up @@ -66,7 +72,7 @@ type ProjectStatusUpdater interface {

type ProjectCommandOutputHandler interface {
// Send will enqueue the msg and wait for Handle() to receive the message.
Send(ctx models.ProjectCommandContext, msg string)
Send(ctx models.ProjectCommandContext, msg string, operationComplete bool)

// Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting
// to read the channel in the same goroutine
Expand Down Expand Up @@ -103,12 +109,12 @@ func NewAsyncProjectCommandOutputHandler(
receiverBuffers: map[string]map[chan string]bool{},
projectStatusUpdater: projectStatusUpdater,
projectJobURLGenerator: projectJobURLGenerator,
projectOutputBuffers: map[string][]string{},
projectOutputBuffers: map[string]OutputBuffer{},
pullToJobMapping: sync.Map{},
}
}

func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) {
func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) {
p.projectCmdOutput <- &ProjectCmdOutputLine{
JobID: ctx.JobID,
JobContext: JobContext{
Expand All @@ -120,7 +126,8 @@ func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext
Workspace: ctx.Workspace,
},
},
Line: msg,
Line: msg,
OperationComplete: operationComplete,
}
}

Expand All @@ -130,6 +137,11 @@ func (p *AsyncProjectCommandOutputHandler) Register(jobID string, receiver chan

func (p *AsyncProjectCommandOutputHandler) Handle() {
for msg := range p.projectCmdOutput {
if msg.OperationComplete {
p.completeJob(msg.JobID)
continue
}

// Add job to pullToJob mapping
if _, ok := p.pullToJobMapping.Load(msg.JobContext.PullContext); !ok {
p.pullToJobMapping.Store(msg.JobContext.PullContext, map[string]bool{})
Expand All @@ -138,10 +150,34 @@ func (p *AsyncProjectCommandOutputHandler) Handle() {
jobMapping := value.(map[string]bool)
jobMapping[msg.JobID] = true

// Forward new message to all receiver channels and output buffer
p.writeLogLine(msg.JobID, msg.Line)
}
}

func (p *AsyncProjectCommandOutputHandler) completeJob(jobID string) {
p.projectOutputBuffersLock.Lock()
p.receiverBuffersLock.Lock()
defer func() {
p.projectOutputBuffersLock.Unlock()
p.receiverBuffersLock.Unlock()
}()

// Update operation status to complete
if outputBuffer, ok := p.projectOutputBuffers[jobID]; ok {
outputBuffer.OperationComplete = true
p.projectOutputBuffers[jobID] = outputBuffer
}

// Close active receiver channels
if openChannels, ok := p.receiverBuffers[jobID]; ok {
for ch := range openChannels {
close(ch)
}
}

}

func (p *AsyncProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error {
url, err := p.projectJobURLGenerator.GenerateProjectJobURL(ctx)

Expand All @@ -153,13 +189,19 @@ func (p *AsyncProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.Projec

func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, jobID string) {
p.projectOutputBuffersLock.RLock()
buffer := p.projectOutputBuffers[jobID]
outputBuffer := p.projectOutputBuffers[jobID]
p.projectOutputBuffersLock.RUnlock()

for _, line := range buffer {
for _, line := range outputBuffer.Buffer {
ch <- line
}

// No need register receiver since all the logs have been streamed
if outputBuffer.OperationComplete {
close(ch)
return
}

// add the channel to our registry after we backfill the contents of the buffer,
// to prevent new messages coming in interleaving with this backfill.
p.receiverBuffersLock.Lock()
Expand Down Expand Up @@ -189,10 +231,15 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(jobID string, line strin
p.receiverBuffersLock.Unlock()

p.projectOutputBuffersLock.Lock()
if p.projectOutputBuffers[jobID] == nil {
p.projectOutputBuffers[jobID] = []string{}
if _, ok := p.projectOutputBuffers[jobID]; !ok {
p.projectOutputBuffers[jobID] = OutputBuffer{
Buffer: []string{},
}
}
p.projectOutputBuffers[jobID] = append(p.projectOutputBuffers[jobID], line)
outputBuffer := p.projectOutputBuffers[jobID]
outputBuffer.Buffer = append(outputBuffer.Buffer, line)
p.projectOutputBuffers[jobID] = outputBuffer

p.projectOutputBuffersLock.Unlock()
}

Expand All @@ -208,7 +255,7 @@ func (p *AsyncProjectCommandOutputHandler) GetReceiverBufferForPull(jobID string
return p.receiverBuffers[jobID]
}

func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) []string {
func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) OutputBuffer {
return p.projectOutputBuffers[jobID]
}

Expand Down Expand Up @@ -243,7 +290,7 @@ func (p *AsyncProjectCommandOutputHandler) CleanUp(pullContext PullContext) {
// NoopProjectOutputHandler is a mock that doesn't do anything
type NoopProjectOutputHandler struct{}

func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) {
func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string, isOperationComplete bool) {
}

func (p *NoopProjectOutputHandler) Register(jobID string, receiver chan string) {}
Expand Down
Loading

0 comments on commit 233ca5d

Please sign in to comment.