From 233ca5d1120f398685dc7161567aba065a304f62 Mon Sep 17 00:00:00 2001 From: Aayush Gupta <43479002+Aayyush@users.noreply.github.com> Date: Wed, 2 Feb 2022 15:28:45 -0800 Subject: [PATCH] Update log handler to close buffered channels when an operation is complete (#170) --- server/controllers/websocket/writer.go | 31 +------ server/core/terraform/async_client.go | 4 +- server/events/project_command_runner.go | 12 ++- server/events/pull_closed_executor_test.go | 2 +- .../mock_project_command_output_handler.go | 20 +++-- .../project_command_output_handler.go | 73 +++++++++++++--- .../project_command_output_handler_test.go | 84 +++++++++++++++++-- 7 files changed, 165 insertions(+), 61 deletions(-) diff --git a/server/controllers/websocket/writer.go b/server/controllers/websocket/writer.go index eca3f6ae60..9058f69dd9 100644 --- a/server/controllers/websocket/writer.go +++ b/server/controllers/websocket/writer.go @@ -31,18 +31,6 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin return errors.Wrap(err, "upgrading websocket connection") } - conn.SetCloseHandler(func(code int, text string) error { - // Close the channnel after websocket connection closed. - // Will gracefully exit the ProjectCommandOutputHandler.Register() call and cleanup. - // is it good practice to close at the receiver? Probably not, we should figure out a better - // way to handle this case - close(input) - return nil - }) - - // Add a reader goroutine to listen for socket.close() events. - go w.setReadHandler(conn) - // block on reading our input channel for msg := range input { if err := conn.WriteMessage(websocket.BinaryMessage, []byte("\r"+msg+"\n")); err != nil { @@ -51,20 +39,9 @@ func (w *Writer) Write(rw http.ResponseWriter, r *http.Request, input chan strin } } - return nil -} - -func (w *Writer) setReadHandler(c *websocket.Conn) { - for { - _, _, err := c.ReadMessage() - if err != nil { - // CloseGoingAway (1001) when a browser tab is closed. - // Expected behaviour since we have a CloseHandler(), log warning if not a CloseGoingAway - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - w.log.Warn("Failed to read WS message: %s", err) - } - return - } + // close ws conn after input channel is closed + if err = conn.Close(); err != nil { + w.log.Warn("Failed to close ws connection: %s", err) } - + return nil } diff --git a/server/core/terraform/async_client.go b/server/core/terraform/async_client.go index 2db3a88d5e..a94fbcae68 100644 --- a/server/core/terraform/async_client.go +++ b/server/core/terraform/async_client.go @@ -93,7 +93,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext, for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message) + c.projectCmdOutputHandler.Send(ctx, message, false) } wg.Done() }() @@ -102,7 +102,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext, for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message) + c.projectCmdOutputHandler.Send(ctx, message, false) } wg.Done() }() diff --git a/server/events/project_command_runner.go b/server/events/project_command_runner.go index 787ba1add7..33e0c445a4 100644 --- a/server/events/project_command_runner.go +++ b/server/events/project_command_runner.go @@ -29,6 +29,8 @@ import ( "github.com/runatlantis/atlantis/server/lyft/feature" ) +const OperationComplete = true + // DirNotExistErr is an error caused by the directory not existing. type DirNotExistErr struct { RepoRelDir string @@ -124,13 +126,15 @@ type ProjectOutputWrapper struct { } func (p *ProjectOutputWrapper) Plan(ctx models.ProjectCommandContext) models.ProjectResult { - // Reset the buffer when running the plan. We only need to do this for plan, - // apply is a continuation of the same workflow - return p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan) + result := p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan) + p.ProjectCmdOutputHandler.Send(ctx, "", OperationComplete) + return result } func (p *ProjectOutputWrapper) Apply(ctx models.ProjectCommandContext) models.ProjectResult { - return p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply) + result := p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply) + p.ProjectCmdOutputHandler.Send(ctx, "", OperationComplete) + return result } func (p *ProjectOutputWrapper) updateProjectPRStatus(commandName models.CommandName, ctx models.ProjectCommandContext, execute func(ctx models.ProjectCommandContext) models.ProjectResult) models.ProjectResult { diff --git a/server/events/pull_closed_executor_test.go b/server/events/pull_closed_executor_test.go index a8bebe1a05..e3b1c200b4 100644 --- a/server/events/pull_closed_executor_test.go +++ b/server/events/pull_closed_executor_test.go @@ -210,7 +210,7 @@ func TestCleanUpLogStreaming(t *testing.T) { } go prjCmdOutHandler.Handle() - prjCmdOutHandler.Send(ctx, "Test Message") + prjCmdOutHandler.Send(ctx, "Test Message", false) // Create boltdb and add pull request. var lockBucket = "bucket" diff --git a/server/handlers/mocks/mock_project_command_output_handler.go b/server/handlers/mocks/mock_project_command_output_handler.go index b3ec5ee894..fa8fb149b6 100644 --- a/server/handlers/mocks/mock_project_command_output_handler.go +++ b/server/handlers/mocks/mock_project_command_output_handler.go @@ -58,11 +58,11 @@ func (mock *MockProjectCommandOutputHandler) Register(_param0 string, _param1 ch pegomock.GetGenericMockFrom(mock).Invoke("Register", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) { +func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1} + params := []pegomock.Param{_param0, _param1, _param2} pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) } @@ -224,8 +224,8 @@ func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetAllCap return } -func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) *MockProjectCommandOutputHandler_Send_OngoingVerification { - params := []pegomock.Param{_param0, _param1} +func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) *MockProjectCommandOutputHandler_Send_OngoingVerification { + params := []pegomock.Param{_param0, _param1, _param2} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -235,12 +235,12 @@ type MockProjectCommandOutputHandler_Send_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string, bool) { + _param0, _param1, _param2 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string, _param2 []bool) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) @@ -251,6 +251,10 @@ func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapture for u, param := range params[1] { _param1[u] = param.(string) } + _param2 = make([]bool, len(c.methodInvocations)) + for u, param := range params[2] { + _param2[u] = param.(bool) + } } return } diff --git a/server/handlers/project_command_output_handler.go b/server/handlers/project_command_output_handler.go index 57dc62dd27..44a8e9ca7e 100644 --- a/server/handlers/project_command_output_handler.go +++ b/server/handlers/project_command_output_handler.go @@ -7,6 +7,11 @@ import ( "github.com/runatlantis/atlantis/server/logging" ) +type OutputBuffer struct { + OperationComplete bool + Buffer []string +} + type PullContext struct { PullNum int Repo string @@ -24,7 +29,8 @@ type ProjectCmdOutputLine struct { JobContext JobContext - Line string + Line string + OperationComplete bool } // AsyncProjectCommandOutputHandler is a handler to transport terraform client @@ -32,7 +38,7 @@ type ProjectCmdOutputLine struct { type AsyncProjectCommandOutputHandler struct { projectCmdOutput chan *ProjectCmdOutputLine - projectOutputBuffers map[string][]string + projectOutputBuffers map[string]OutputBuffer projectOutputBuffersLock sync.RWMutex receiverBuffers map[string]map[chan string]bool @@ -66,7 +72,7 @@ type ProjectStatusUpdater interface { type ProjectCommandOutputHandler interface { // Send will enqueue the msg and wait for Handle() to receive the message. - Send(ctx models.ProjectCommandContext, msg string) + Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) // Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting // to read the channel in the same goroutine @@ -103,12 +109,12 @@ func NewAsyncProjectCommandOutputHandler( receiverBuffers: map[string]map[chan string]bool{}, projectStatusUpdater: projectStatusUpdater, projectJobURLGenerator: projectJobURLGenerator, - projectOutputBuffers: map[string][]string{}, + projectOutputBuffers: map[string]OutputBuffer{}, pullToJobMapping: sync.Map{}, } } -func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { +func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) { p.projectCmdOutput <- &ProjectCmdOutputLine{ JobID: ctx.JobID, JobContext: JobContext{ @@ -120,7 +126,8 @@ func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext Workspace: ctx.Workspace, }, }, - Line: msg, + Line: msg, + OperationComplete: operationComplete, } } @@ -130,6 +137,11 @@ func (p *AsyncProjectCommandOutputHandler) Register(jobID string, receiver chan func (p *AsyncProjectCommandOutputHandler) Handle() { for msg := range p.projectCmdOutput { + if msg.OperationComplete { + p.completeJob(msg.JobID) + continue + } + // Add job to pullToJob mapping if _, ok := p.pullToJobMapping.Load(msg.JobContext.PullContext); !ok { p.pullToJobMapping.Store(msg.JobContext.PullContext, map[string]bool{}) @@ -138,10 +150,34 @@ func (p *AsyncProjectCommandOutputHandler) Handle() { jobMapping := value.(map[string]bool) jobMapping[msg.JobID] = true + // Forward new message to all receiver channels and output buffer p.writeLogLine(msg.JobID, msg.Line) } } +func (p *AsyncProjectCommandOutputHandler) completeJob(jobID string) { + p.projectOutputBuffersLock.Lock() + p.receiverBuffersLock.Lock() + defer func() { + p.projectOutputBuffersLock.Unlock() + p.receiverBuffersLock.Unlock() + }() + + // Update operation status to complete + if outputBuffer, ok := p.projectOutputBuffers[jobID]; ok { + outputBuffer.OperationComplete = true + p.projectOutputBuffers[jobID] = outputBuffer + } + + // Close active receiver channels + if openChannels, ok := p.receiverBuffers[jobID]; ok { + for ch := range openChannels { + close(ch) + } + } + +} + func (p *AsyncProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error { url, err := p.projectJobURLGenerator.GenerateProjectJobURL(ctx) @@ -153,13 +189,19 @@ func (p *AsyncProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.Projec func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, jobID string) { p.projectOutputBuffersLock.RLock() - buffer := p.projectOutputBuffers[jobID] + outputBuffer := p.projectOutputBuffers[jobID] p.projectOutputBuffersLock.RUnlock() - for _, line := range buffer { + for _, line := range outputBuffer.Buffer { ch <- line } + // No need register receiver since all the logs have been streamed + if outputBuffer.OperationComplete { + close(ch) + return + } + // add the channel to our registry after we backfill the contents of the buffer, // to prevent new messages coming in interleaving with this backfill. p.receiverBuffersLock.Lock() @@ -189,10 +231,15 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(jobID string, line strin p.receiverBuffersLock.Unlock() p.projectOutputBuffersLock.Lock() - if p.projectOutputBuffers[jobID] == nil { - p.projectOutputBuffers[jobID] = []string{} + if _, ok := p.projectOutputBuffers[jobID]; !ok { + p.projectOutputBuffers[jobID] = OutputBuffer{ + Buffer: []string{}, + } } - p.projectOutputBuffers[jobID] = append(p.projectOutputBuffers[jobID], line) + outputBuffer := p.projectOutputBuffers[jobID] + outputBuffer.Buffer = append(outputBuffer.Buffer, line) + p.projectOutputBuffers[jobID] = outputBuffer + p.projectOutputBuffersLock.Unlock() } @@ -208,7 +255,7 @@ func (p *AsyncProjectCommandOutputHandler) GetReceiverBufferForPull(jobID string return p.receiverBuffers[jobID] } -func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) []string { +func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) OutputBuffer { return p.projectOutputBuffers[jobID] } @@ -243,7 +290,7 @@ func (p *AsyncProjectCommandOutputHandler) CleanUp(pullContext PullContext) { // NoopProjectOutputHandler is a mock that doesn't do anything type NoopProjectOutputHandler struct{} -func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { +func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string, isOperationComplete bool) { } func (p *NoopProjectOutputHandler) Register(jobID string, receiver chan string) {} diff --git a/server/handlers/project_command_output_handler_test.go b/server/handlers/project_command_output_handler_test.go index 321ac65ff0..a4d734b71a 100644 --- a/server/handlers/project_command_output_handler_test.go +++ b/server/handlers/project_command_output_handler_test.go @@ -4,6 +4,7 @@ import ( "errors" "sync" "testing" + "time" . "github.com/petergtz/pegomock" "github.com/runatlantis/atlantis/server/events/models" @@ -90,7 +91,7 @@ func TestProjectCommandOutputHandler(t *testing.T) { } }() - projectOutputHandler.Send(ctx, Msg) + projectOutputHandler.Send(ctx, Msg, false) wg.Wait() close(ch) @@ -103,7 +104,7 @@ func TestProjectCommandOutputHandler(t *testing.T) { projectOutputHandler := createProjectCommandOutputHandler(t) // send first message to populated the buffer - projectOutputHandler.Send(ctx, Msg) + projectOutputHandler.Send(ctx, Msg, false) ch := make(chan string) @@ -127,7 +128,7 @@ func TestProjectCommandOutputHandler(t *testing.T) { // before sending messages due to the way we lock our buffer memory cache projectOutputHandler.Register(ctx.JobID, ch) - projectOutputHandler.Send(ctx, Msg) + projectOutputHandler.Send(ctx, Msg, false) wg.Wait() close(ch) @@ -176,7 +177,6 @@ func TestProjectCommandOutputHandler(t *testing.T) { assert.Error(t, err) }) - // Close all jobs for a PR when clean up t.Run("clean up all jobs when PR is closed", func(t *testing.T) { var wg sync.WaitGroup projectOutputHandler := createProjectCommandOutputHandler(t) @@ -200,8 +200,8 @@ func TestProjectCommandOutputHandler(t *testing.T) { } }() - projectOutputHandler.Send(ctx, Msg) - projectOutputHandler.Send(ctx, "Complete") + projectOutputHandler.Send(ctx, Msg, false) + projectOutputHandler.Send(ctx, "Complete", false) pullContext := handlers.PullContext{ PullNum: ctx.Pull.Num, @@ -219,4 +219,76 @@ func TestProjectCommandOutputHandler(t *testing.T) { assert.Empty(t, dfProjectOutputHandler.GetReceiverBufferForPull(ctx.JobID)) assert.Empty(t, dfProjectOutputHandler.GetJobIdMapForPullContext(pullContext)) }) + + t.Run("mark operation status complete and close conn buffers for the job", func(t *testing.T) { + projectOutputHandler := createProjectCommandOutputHandler(t) + + ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + // read from channel + go func() { + for _ = range ch { + } + }() + + projectOutputHandler.Send(ctx, Msg, false) + projectOutputHandler.Send(ctx, "", true) + + // Wait for the handler to process the message + time.Sleep(10 * time.Millisecond) + + dfProjectOutputHandler, ok := projectOutputHandler.(*handlers.AsyncProjectCommandOutputHandler) + assert.True(t, ok) + + outputBuffer := dfProjectOutputHandler.GetProjectOutputBuffer(ctx.JobID) + assert.True(t, outputBuffer.OperationComplete) + + _, ok = (<-ch) + assert.False(t, ok) + + }) + + t.Run("close conn buffer after streaming logs for completed operation", func(t *testing.T) { + projectOutputHandler := createProjectCommandOutputHandler(t) + + ch := make(chan string) + + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + // read from channel + go func() { + for _ = range ch { + } + }() + + projectOutputHandler.Send(ctx, Msg, false) + projectOutputHandler.Send(ctx, "", true) + + // Wait for the handler to process the message + time.Sleep(10 * time.Millisecond) + + ch_2 := make(chan string) + opComplete := make(chan bool) + + // buffer channel will be closed immediately after logs are streamed + go func() { + for _ = range ch_2 { + } + opComplete <- true + }() + + projectOutputHandler.Register(ctx.JobID, ch_2) + + assert.True(t, <-opComplete) + }) }