diff --git a/server/controllers/websocket/mux.go b/server/controllers/websocket/mux.go index 8288df3212..7d063779db 100644 --- a/server/controllers/websocket/mux.go +++ b/server/controllers/websocket/mux.go @@ -1,7 +1,6 @@ package websocket import ( - "fmt" "net/http" "github.com/gorilla/websocket" @@ -18,8 +17,6 @@ type PartitionKeyGenerator interface { // and is responsible for registering/deregistering new buffers type PartitionRegistry interface { Register(key string, buffer chan string) - Deregister(key string, buffer chan string) - IsKeyExists(key string) bool } // Multiplexor is responsible for handling the data transfer between the storage layer @@ -53,18 +50,15 @@ func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error { return errors.Wrapf(err, "generating partition key") } - // check if the job ID exists before registering receiver - if !m.registry.IsKeyExists(key) { - return fmt.Errorf("invalid key: %s", key) - } - // Buffer size set to 1000 to ensure messages get queued. // TODO: make buffer size configurable buffer := make(chan string, 1000) + // Note: Here we register the key without checking if the job exists because + // if the job DNE, the job is marked complete and we close the ws conn immediately + // spinning up a goroutine for this since we are attempting to block on the read side. go m.registry.Register(key, buffer) - defer m.registry.Deregister(key, buffer) return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key) } diff --git a/server/core/terraform/async_client.go b/server/core/terraform/async_client.go index f8fa74f568..76bb50ee2a 100644 --- a/server/core/terraform/async_client.go +++ b/server/core/terraform/async_client.go @@ -93,7 +93,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext, for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message, false) + c.projectCmdOutputHandler.Send(ctx, message) } wg.Done() }() @@ -102,7 +102,7 @@ func (c *AsyncClient) RunCommandAsyncWithInput(ctx models.ProjectCommandContext, for s.Scan() { message := s.Text() outCh <- Line{Line: message} - c.projectCmdOutputHandler.Send(ctx, message, false) + c.projectCmdOutputHandler.Send(ctx, message) } wg.Done() }() diff --git a/server/events/mocks/mock_job_closer.go b/server/events/mocks/mock_job_closer.go new file mode 100644 index 0000000000..32c97424ae --- /dev/null +++ b/server/events/mocks/mock_job_closer.go @@ -0,0 +1,97 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/events (interfaces: JobCloser) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + "reflect" + "time" +) + +type MockJobCloser struct { + fail func(message string, callerSkip ...int) +} + +func NewMockJobCloser(options ...pegomock.Option) *MockJobCloser { + mock := &MockJobCloser{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockJobCloser) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockJobCloser) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockJobCloser) CloseJob(_param0 string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobCloser().") + } + params := []pegomock.Param{_param0} + pegomock.GetGenericMockFrom(mock).Invoke("CloseJob", params, []reflect.Type{}) +} + +func (mock *MockJobCloser) VerifyWasCalledOnce() *VerifierMockJobCloser { + return &VerifierMockJobCloser{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockJobCloser) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockJobCloser { + return &VerifierMockJobCloser{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockJobCloser) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockJobCloser { + return &VerifierMockJobCloser{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockJobCloser) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockJobCloser { + return &VerifierMockJobCloser{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockJobCloser struct { + mock *MockJobCloser + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockJobCloser) CloseJob(_param0 string) *MockJobCloser_CloseJob_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CloseJob", params, verifier.timeout) + return &MockJobCloser_CloseJob_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobCloser_CloseJob_OngoingVerification struct { + mock *MockJobCloser + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobCloser_CloseJob_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *MockJobCloser_CloseJob_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} diff --git a/server/events/project_command_runner.go b/server/events/project_command_runner.go index 7e5a6dc404..f8a6b9bf6b 100644 --- a/server/events/project_command_runner.go +++ b/server/events/project_command_runner.go @@ -125,29 +125,30 @@ type JobURLSetter interface { SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error } -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_message_sender.go JobMessageSender +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_closer.go JobCloser -type JobMessageSender interface { - Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) +// Job Closer closes a job by marking op complete and clearing up buffers if logs are successfully persisted +type JobCloser interface { + CloseJob(jobID string) } // ProjectOutputWrapper is a decorator that creates a new PR status check per project. // The status contains a url that outputs current progress of the terraform plan/apply command. type ProjectOutputWrapper struct { ProjectCommandRunner - JobMessageSender JobMessageSender - JobURLSetter JobURLSetter + JobURLSetter JobURLSetter + JobCloser JobCloser } func (p *ProjectOutputWrapper) Plan(ctx models.ProjectCommandContext) models.ProjectResult { result := p.updateProjectPRStatus(models.PlanCommand, ctx, p.ProjectCommandRunner.Plan) - p.JobMessageSender.Send(ctx, "", OperationComplete) + p.JobCloser.CloseJob(ctx.JobID) return result } func (p *ProjectOutputWrapper) Apply(ctx models.ProjectCommandContext) models.ProjectResult { result := p.updateProjectPRStatus(models.ApplyCommand, ctx, p.ProjectCommandRunner.Apply) - p.JobMessageSender.Send(ctx, "", OperationComplete) + p.JobCloser.CloseJob(ctx.JobID) return result } diff --git a/server/events/project_command_runner_test.go b/server/events/project_command_runner_test.go index 4893eeca05..63461e96bb 100644 --- a/server/events/project_command_runner_test.go +++ b/server/events/project_command_runner_test.go @@ -189,12 +189,12 @@ func TestProjectOutputWrapper(t *testing.T) { var expCommitStatus models.CommitStatus mockJobURLSetter := eventmocks.NewMockJobURLSetter() - mockJobMessageSender := eventmocks.NewMockJobMessageSender() + mockJobCloser := eventmocks.NewMockJobCloser() mockProjectCommandRunner := mocks.NewMockProjectCommandRunner() runner := &events.ProjectOutputWrapper{ JobURLSetter: mockJobURLSetter, - JobMessageSender: mockJobMessageSender, + JobCloser: mockJobCloser, ProjectCommandRunner: mockProjectCommandRunner, } diff --git a/server/events/pull_closed_executor_test.go b/server/events/pull_closed_executor_test.go index 74ddc53d6d..4135f2d5ec 100644 --- a/server/events/pull_closed_executor_test.go +++ b/server/events/pull_closed_executor_test.go @@ -31,6 +31,7 @@ import ( "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/events/models/fixtures" vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks" + jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks" loggermocks "github.com/runatlantis/atlantis/server/logging/mocks" . "github.com/runatlantis/atlantis/testing" ) @@ -197,7 +198,8 @@ func TestCleanUpLogStreaming(t *testing.T) { // Create Log streaming resources prjCmdOutput := make(chan *jobs.ProjectCmdOutputLine) - prjCmdOutHandler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutput, logger) + storageBackend := jobmocks.NewMockStorageBackend() + prjCmdOutHandler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutput, logger, jobs.NewJobStore(storageBackend)) ctx := models.ProjectCommandContext{ BaseRepo: fixtures.GithubRepo, Pull: fixtures.Pull, @@ -206,7 +208,7 @@ func TestCleanUpLogStreaming(t *testing.T) { } go prjCmdOutHandler.Handle() - prjCmdOutHandler.Send(ctx, "Test Message", false) + prjCmdOutHandler.Send(ctx, "Test Message") // Create boltdb and add pull request. var lockBucket = "bucket" @@ -280,7 +282,7 @@ func TestCleanUpLogStreaming(t *testing.T) { // Assert log streaming resources are cleaned up. dfPrjCmdOutputHandler := prjCmdOutHandler.(*jobs.AsyncProjectCommandOutputHandler) - assert.Empty(t, dfPrjCmdOutputHandler.GetProjectOutputBuffer(ctx.PullInfo())) + assert.Empty(t, dfPrjCmdOutputHandler.GetJob(ctx.PullInfo()).Output) assert.Empty(t, dfPrjCmdOutputHandler.GetReceiverBufferForPull(ctx.PullInfo())) }) } diff --git a/server/jobs/job_store.go b/server/jobs/job_store.go new file mode 100644 index 0000000000..1712707777 --- /dev/null +++ b/server/jobs/job_store.go @@ -0,0 +1,147 @@ +package jobs + +import ( + "fmt" + "sync" + + "github.com/pkg/errors" +) + +type JobStatus int + +const ( + Processing JobStatus = iota + Complete +) + +type Job struct { + Output []string + Status JobStatus +} + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_store.go JobStore + +type JobStore interface { + // Gets the job from the in memory buffer, if available and if not, reaches to the storage backend + // Returns an empty job with error if not in storage backend + Get(jobID string) (Job, error) + + // Appends a given string to a job's output if the job is not complete yet + AppendOutput(jobID string, output string) error + + // Sets a job status to complete and triggers any associated workflow, + // e.g: if the status is complete, the job is flushed to the associated storage backend + SetJobCompleteStatus(jobID string, status JobStatus) error + + // Removes a job from the store + RemoveJob(jobID string) +} + +func NewJobStore(storageBackend StorageBackend) *LayeredJobStore { + return &LayeredJobStore{ + jobs: map[string]*Job{}, + storageBackend: storageBackend, + } +} + +// Setup job store for testing +func NewTestJobStore(storageBackend StorageBackend, jobs map[string]*Job) *LayeredJobStore { + return &LayeredJobStore{ + jobs: jobs, + storageBackend: storageBackend, + } +} + +// layeredJobStore is a job store with one or more than one layers of persistence +// storageBackend in this case +type LayeredJobStore struct { + jobs map[string]*Job + storageBackend StorageBackend + lock sync.RWMutex +} + +func (j *LayeredJobStore) Get(jobID string) (Job, error) { + // Get from memory if available + if job, ok := j.GetJobFromMemory(jobID); ok { + return job, nil + } + + // Get from storage backend if not in memory. + logs, err := j.storageBackend.Read(jobID) + if err != nil { + return Job{}, err + } + + // If read from storage backend, mark job complete so that the conn + // can be closed + return Job{ + Output: logs, + Status: Complete, + }, nil +} + +func (j *LayeredJobStore) GetJobFromMemory(jobID string) (Job, bool) { + j.lock.RLock() + defer j.lock.RUnlock() + + if j.jobs[jobID] == nil { + return Job{}, false + } + return *j.jobs[jobID], true +} + +func (j *LayeredJobStore) AppendOutput(jobID string, output string) error { + j.lock.Lock() + defer j.lock.Unlock() + + // Create new job if job dne + if j.jobs[jobID] == nil { + j.jobs[jobID] = &Job{} + } + + if j.jobs[jobID].Status == Complete { + return fmt.Errorf("cannot append to a complete job") + } + + updatedOutput := append(j.jobs[jobID].Output, output) + j.jobs[jobID].Output = updatedOutput + return nil +} + +func (j *LayeredJobStore) RemoveJob(jobID string) { + j.lock.Lock() + defer j.lock.Unlock() + + delete(j.jobs, jobID) +} + +func (j *LayeredJobStore) SetJobCompleteStatus(jobID string, status JobStatus) error { + j.lock.Lock() + defer j.lock.Unlock() + + // Error out when job dne + if j.jobs[jobID] == nil { + return fmt.Errorf("job: %s does not exist", jobID) + } + + // Error when job is already set to complete + if job := j.jobs[jobID]; job.Status == Complete { + return fmt.Errorf("job: %s is already complete", jobID) + } + + job := j.jobs[jobID] + job.Status = Complete + + // Persist to storage backend + ok, err := j.storageBackend.Write(jobID, job.Output) + if err != nil { + return errors.Wrapf(err, "error persisting job: %s", jobID) + } + + // Clear output buffers if successfully persisted + if ok { + delete(j.jobs, jobID) + } + + return nil +} diff --git a/server/jobs/job_store_test.go b/server/jobs/job_store_test.go new file mode 100644 index 0000000000..54316e7219 --- /dev/null +++ b/server/jobs/job_store_test.go @@ -0,0 +1,187 @@ +package jobs_test + +import ( + "fmt" + "testing" + + "github.com/pkg/errors" + "github.com/runatlantis/atlantis/server/jobs" + "github.com/runatlantis/atlantis/server/jobs/mocks" + "github.com/runatlantis/atlantis/server/jobs/mocks/matchers" + "github.com/stretchr/testify/assert" + + . "github.com/petergtz/pegomock" + . "github.com/runatlantis/atlantis/testing" +) + +func TestJobStore_Get(t *testing.T) { + t.Run("load from memory", func(t *testing.T) { + // Setup job store + storageBackend := mocks.NewMockStorageBackend() + expectedJob := &jobs.Job{ + Output: []string{"a"}, + Status: jobs.Complete, + } + jobsMap := make(map[string]*jobs.Job) + jobsMap["1234"] = expectedJob + + jobStore := jobs.NewTestJobStore(storageBackend, jobsMap) + + // Assert job + gotJob, err := jobStore.Get("1234") + assert.NoError(t, err) + assert.Equal(t, expectedJob.Output, gotJob.Output) + assert.Equal(t, expectedJob.Status, gotJob.Status) + }) + + t.Run("load from storage backend when not in memory", func(t *testing.T) { + // Setup job store + storageBackend := mocks.NewMockStorageBackend() + expectedLogs := []string{"a", "b"} + expectedJob := jobs.Job{ + Output: expectedLogs, + Status: jobs.Complete, + } + When(storageBackend.Read(AnyString())).ThenReturn(expectedLogs, nil) + + // Assert job + jobStore := jobs.NewJobStore(storageBackend) + gotJob, err := jobStore.Get("1234") + assert.NoError(t, err) + assert.Equal(t, expectedJob.Output, gotJob.Output) + assert.Equal(t, expectedJob.Status, gotJob.Status) + }) + + t.Run("error when reading from storage backend fails", func(t *testing.T) { + // Setup job store + storageBackend := mocks.NewMockStorageBackend() + expectedError := fmt.Errorf("error") + When(storageBackend.Read(AnyString())).ThenReturn([]string{}, expectedError) + + // Assert job + jobStore := jobs.NewJobStore(storageBackend) + gotJob, err := jobStore.Get("1234") + assert.Empty(t, gotJob) + assert.ErrorIs(t, expectedError, err) + }) +} + +func TestJobStore_AppendOutput(t *testing.T) { + + t.Run("append output when new job", func(t *testing.T) { + // Setup job store + storageBackend := mocks.NewMockStorageBackend() + jobStore := jobs.NewJobStore(storageBackend) + jobID := "1234" + output := "Test log message" + + jobStore.AppendOutput(jobID, output) + + // Assert job + job, err := jobStore.Get(jobID) + Ok(t, err) + assert.Equal(t, job.Output, []string{output}) + assert.Equal(t, job.Status, jobs.Processing) + }) + + t.Run("append output when existing job", func(t *testing.T) { + // Setup job store + storageBackend := mocks.NewMockStorageBackend() + jobStore := jobs.NewJobStore(storageBackend) + jobID := "1234" + output := []string{"Test log message", "Test log message 2"} + + jobStore.AppendOutput(jobID, output[0]) + jobStore.AppendOutput(jobID, output[1]) + + // Assert job + job, err := jobStore.Get(jobID) + Ok(t, err) + assert.Equal(t, job.Output, output) + assert.Equal(t, job.Status, jobs.Processing) + }) + + t.Run("error when job status complete", func(t *testing.T) { + // Setup job store + storageBackend := mocks.NewMockStorageBackend() + jobID := "1234" + job := &jobs.Job{ + Output: []string{"a"}, + Status: jobs.Complete, + } + + // Add complete to job in store + jobsMap := make(map[string]*jobs.Job) + jobsMap[jobID] = job + + jobStore := jobs.NewTestJobStore(storageBackend, jobsMap) + + // Assert error + err := jobStore.AppendOutput(jobID, "test message") + assert.Error(t, err) + }) +} + +func TestJobStore_UpdateJobStatus(t *testing.T) { + + t.Run("retain job in memory when persist fails", func(t *testing.T) { + // Create new job and add it to store + jobID := "1234" + job := &jobs.Job{ + Output: []string{"a"}, + Status: jobs.Processing, + } + jobsMap := make(map[string]*jobs.Job) + jobsMap[jobID] = job + storageBackendErr := fmt.Errorf("random error") + expecterErr := errors.Wrapf(storageBackendErr, "error persisting job: %s", jobID) + + // Setup storage backend + storageBackend := mocks.NewMockStorageBackend() + When(storageBackend.Write(AnyString(), matchers.AnySliceOfString())).ThenReturn(false, storageBackendErr) + jobStore := jobs.NewTestJobStore(storageBackend, jobsMap) + err := jobStore.SetJobCompleteStatus(jobID, jobs.Complete) + + // Assert storage backend error + assert.EqualError(t, err, expecterErr.Error()) + + // Assert the job is in memory + jobInMem, err := jobStore.Get(jobID) + Ok(t, err) + assert.Equal(t, jobInMem.Output, job.Output) + assert.Equal(t, job.Status, jobs.Complete) + }) + + t.Run("delete from memory when persist succeeds", func(t *testing.T) { + // Create new job and add it to store + jobID := "1234" + job := &jobs.Job{ + Output: []string{"a"}, + Status: jobs.Processing, + } + jobsMap := make(map[string]*jobs.Job) + jobsMap[jobID] = job + + // Setup storage backend + storageBackend := mocks.NewMockStorageBackend() + When(storageBackend.Write(AnyString(), matchers.AnySliceOfString())).ThenReturn(true, nil) + + jobStore := jobs.NewTestJobStore(storageBackend, jobsMap) + err := jobStore.SetJobCompleteStatus(jobID, jobs.Complete) + assert.Nil(t, err) + + _, ok := jobStore.GetJobFromMemory(jobID) + assert.False(t, ok) + }) + + t.Run("error when job does not exist", func(t *testing.T) { + storageBackend := mocks.NewMockStorageBackend() + jobStore := jobs.NewJobStore(storageBackend) + jobID := "1234" + expectedErrString := fmt.Sprintf("job: %s does not exist", jobID) + + err := jobStore.SetJobCompleteStatus(jobID, jobs.Complete) + assert.EqualError(t, err, expectedErrString) + + }) +} diff --git a/server/jobs/mocks/matchers/io_readcloser.go b/server/jobs/mocks/matchers/io_readcloser.go new file mode 100644 index 0000000000..abba94c9cd --- /dev/null +++ b/server/jobs/mocks/matchers/io_readcloser.go @@ -0,0 +1,33 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" + + io "io" +) + +func AnyIoReadCloser() io.ReadCloser { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(io.ReadCloser))(nil)).Elem())) + var nullValue io.ReadCloser + return nullValue +} + +func EqIoReadCloser(value io.ReadCloser) io.ReadCloser { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue io.ReadCloser + return nullValue +} + +func NotEqIoReadCloser(value io.ReadCloser) io.ReadCloser { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue io.ReadCloser + return nullValue +} + +func IoReadCloserThat(matcher pegomock.ArgumentMatcher) io.ReadCloser { + pegomock.RegisterMatcher(matcher) + var nullValue io.ReadCloser + return nullValue +} diff --git a/server/jobs/mocks/matchers/jobs_job.go b/server/jobs/mocks/matchers/jobs_job.go new file mode 100644 index 0000000000..bc3112a1d3 --- /dev/null +++ b/server/jobs/mocks/matchers/jobs_job.go @@ -0,0 +1,33 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" + + jobs "github.com/runatlantis/atlantis/server/jobs" +) + +func AnyJobsJob() jobs.Job { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(jobs.Job))(nil)).Elem())) + var nullValue jobs.Job + return nullValue +} + +func EqJobsJob(value jobs.Job) jobs.Job { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue jobs.Job + return nullValue +} + +func NotEqJobsJob(value jobs.Job) jobs.Job { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue jobs.Job + return nullValue +} + +func JobsJobThat(matcher pegomock.ArgumentMatcher) jobs.Job { + pegomock.RegisterMatcher(matcher) + var nullValue jobs.Job + return nullValue +} diff --git a/server/jobs/mocks/matchers/jobs_jobstatus.go b/server/jobs/mocks/matchers/jobs_jobstatus.go new file mode 100644 index 0000000000..2d56719cdc --- /dev/null +++ b/server/jobs/mocks/matchers/jobs_jobstatus.go @@ -0,0 +1,33 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" + + jobs "github.com/runatlantis/atlantis/server/jobs" +) + +func AnyJobsJobStatus() jobs.JobStatus { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(jobs.JobStatus))(nil)).Elem())) + var nullValue jobs.JobStatus + return nullValue +} + +func EqJobsJobStatus(value jobs.JobStatus) jobs.JobStatus { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue jobs.JobStatus + return nullValue +} + +func NotEqJobsJobStatus(value jobs.JobStatus) jobs.JobStatus { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue jobs.JobStatus + return nullValue +} + +func JobsJobStatusThat(matcher pegomock.ArgumentMatcher) jobs.JobStatus { + pegomock.RegisterMatcher(matcher) + var nullValue jobs.JobStatus + return nullValue +} diff --git a/server/jobs/mocks/matchers/slice_of_string.go b/server/jobs/mocks/matchers/slice_of_string.go new file mode 100644 index 0000000000..f9281819dd --- /dev/null +++ b/server/jobs/mocks/matchers/slice_of_string.go @@ -0,0 +1,31 @@ +// Code generated by pegomock. DO NOT EDIT. +package matchers + +import ( + "github.com/petergtz/pegomock" + "reflect" +) + +func AnySliceOfString() []string { + pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*([]string))(nil)).Elem())) + var nullValue []string + return nullValue +} + +func EqSliceOfString(value []string) []string { + pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) + var nullValue []string + return nullValue +} + +func NotEqSliceOfString(value []string) []string { + pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) + var nullValue []string + return nullValue +} + +func SliceOfStringThat(matcher pegomock.ArgumentMatcher) []string { + pegomock.RegisterMatcher(matcher) + var nullValue []string + return nullValue +} diff --git a/server/jobs/mocks/mock_job_store.go b/server/jobs/mocks/mock_job_store.go new file mode 100644 index 0000000000..e17b09e4d1 --- /dev/null +++ b/server/jobs/mocks/mock_job_store.go @@ -0,0 +1,236 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/jobs (interfaces: JobStore) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + jobs "github.com/runatlantis/atlantis/server/jobs" + "reflect" + "time" +) + +type MockJobStore struct { + fail func(message string, callerSkip ...int) +} + +func NewMockJobStore(options ...pegomock.Option) *MockJobStore { + mock := &MockJobStore{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockJobStore) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockJobStore) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockJobStore) AppendOutput(_param0 string, _param1 string) error { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobStore().") + } + params := []pegomock.Param{_param0, _param1} + result := pegomock.GetGenericMockFrom(mock).Invoke("AppendOutput", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(error) + } + } + return ret0 +} + +func (mock *MockJobStore) Get(_param0 string) (jobs.Job, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobStore().") + } + params := []pegomock.Param{_param0} + result := pegomock.GetGenericMockFrom(mock).Invoke("Get", params, []reflect.Type{reflect.TypeOf((*jobs.Job)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 jobs.Job + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(jobs.Job) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockJobStore) RemoveJob(_param0 string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobStore().") + } + params := []pegomock.Param{_param0} + pegomock.GetGenericMockFrom(mock).Invoke("RemoveJob", params, []reflect.Type{}) +} + +func (mock *MockJobStore) SetJobCompleteStatus(_param0 string, _param1 jobs.JobStatus) error { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockJobStore().") + } + params := []pegomock.Param{_param0, _param1} + result := pegomock.GetGenericMockFrom(mock).Invoke("SetJobCompleteStatus", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(error) + } + } + return ret0 +} + +func (mock *MockJobStore) VerifyWasCalledOnce() *VerifierMockJobStore { + return &VerifierMockJobStore{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockJobStore) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockJobStore { + return &VerifierMockJobStore{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockJobStore) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockJobStore { + return &VerifierMockJobStore{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockJobStore) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockJobStore { + return &VerifierMockJobStore{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockJobStore struct { + mock *MockJobStore + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockJobStore) AppendOutput(_param0 string, _param1 string) *MockJobStore_AppendOutput_OngoingVerification { + params := []pegomock.Param{_param0, _param1} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "AppendOutput", params, verifier.timeout) + return &MockJobStore_AppendOutput_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobStore_AppendOutput_OngoingVerification struct { + mock *MockJobStore + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobStore_AppendOutput_OngoingVerification) GetCapturedArguments() (string, string) { + _param0, _param1 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1] +} + +func (c *MockJobStore_AppendOutput_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + _param1 = make([]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(string) + } + } + return +} + +func (verifier *VerifierMockJobStore) Get(_param0 string) *MockJobStore_Get_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Get", params, verifier.timeout) + return &MockJobStore_Get_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobStore_Get_OngoingVerification struct { + mock *MockJobStore + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobStore_Get_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *MockJobStore_Get_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} + +func (verifier *VerifierMockJobStore) RemoveJob(_param0 string) *MockJobStore_RemoveJob_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "RemoveJob", params, verifier.timeout) + return &MockJobStore_RemoveJob_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobStore_RemoveJob_OngoingVerification struct { + mock *MockJobStore + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobStore_RemoveJob_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *MockJobStore_RemoveJob_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} + +func (verifier *VerifierMockJobStore) SetJobCompleteStatus(_param0 string, _param1 jobs.JobStatus) *MockJobStore_SetJobCompleteStatus_OngoingVerification { + params := []pegomock.Param{_param0, _param1} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SetJobCompleteStatus", params, verifier.timeout) + return &MockJobStore_SetJobCompleteStatus_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockJobStore_SetJobCompleteStatus_OngoingVerification struct { + mock *MockJobStore + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockJobStore_SetJobCompleteStatus_OngoingVerification) GetCapturedArguments() (string, jobs.JobStatus) { + _param0, _param1 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1] +} + +func (c *MockJobStore_SetJobCompleteStatus_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []jobs.JobStatus) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + _param1 = make([]jobs.JobStatus, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.(jobs.JobStatus) + } + } + return +} diff --git a/server/jobs/mocks/mock_project_command_output_handler.go b/server/jobs/mocks/mock_project_command_output_handler.go index 32b6ed76a8..a0afc55625 100644 --- a/server/jobs/mocks/mock_project_command_output_handler.go +++ b/server/jobs/mocks/mock_project_command_output_handler.go @@ -34,12 +34,12 @@ func (mock *MockProjectCommandOutputHandler) CleanUp(_param0 jobs.PullInfo) { pegomock.GetGenericMockFrom(mock).Invoke("CleanUp", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Deregister(_param0 string, _param1 chan string) { +func (mock *MockProjectCommandOutputHandler) CloseJob(_param0 string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1} - pegomock.GetGenericMockFrom(mock).Invoke("Deregister", params, []reflect.Type{}) + params := []pegomock.Param{_param0} + pegomock.GetGenericMockFrom(mock).Invoke("CloseJob", params, []reflect.Type{}) } func (mock *MockProjectCommandOutputHandler) Handle() { @@ -50,21 +50,6 @@ func (mock *MockProjectCommandOutputHandler) Handle() { pegomock.GetGenericMockFrom(mock).Invoke("Handle", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) IsKeyExists(_param0 string) bool { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") - } - params := []pegomock.Param{_param0} - result := pegomock.GetGenericMockFrom(mock).Invoke("IsKeyExists", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem()}) - var ret0 bool - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(bool) - } - } - return ret0 -} - func (mock *MockProjectCommandOutputHandler) Register(_param0 string, _param1 chan string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") @@ -73,11 +58,11 @@ func (mock *MockProjectCommandOutputHandler) Register(_param0 string, _param1 ch pegomock.GetGenericMockFrom(mock).Invoke("Register", params, []reflect.Type{}) } -func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) { +func (mock *MockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockProjectCommandOutputHandler().") } - params := []pegomock.Param{_param0, _param1, _param2} + params := []pegomock.Param{_param0, _param1} pegomock.GetGenericMockFrom(mock).Invoke("Send", params, []reflect.Type{}) } @@ -145,33 +130,29 @@ func (c *MockProjectCommandOutputHandler_CleanUp_OngoingVerification) GetAllCapt return } -func (verifier *VerifierMockProjectCommandOutputHandler) Deregister(_param0 string, _param1 chan string) *MockProjectCommandOutputHandler_Deregister_OngoingVerification { - params := []pegomock.Param{_param0, _param1} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Deregister", params, verifier.timeout) - return &MockProjectCommandOutputHandler_Deregister_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +func (verifier *VerifierMockProjectCommandOutputHandler) CloseJob(_param0 string) *MockProjectCommandOutputHandler_CloseJob_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "CloseJob", params, verifier.timeout) + return &MockProjectCommandOutputHandler_CloseJob_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } -type MockProjectCommandOutputHandler_Deregister_OngoingVerification struct { +type MockProjectCommandOutputHandler_CloseJob_OngoingVerification struct { mock *MockProjectCommandOutputHandler methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Deregister_OngoingVerification) GetCapturedArguments() (string, chan string) { - _param0, _param1 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1] +func (c *MockProjectCommandOutputHandler_CloseJob_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] } -func (c *MockProjectCommandOutputHandler_Deregister_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 []chan string) { +func (c *MockProjectCommandOutputHandler_CloseJob_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]string, len(c.methodInvocations)) for u, param := range params[0] { _param0[u] = param.(string) } - _param1 = make([]chan string, len(c.methodInvocations)) - for u, param := range params[1] { - _param1[u] = param.(chan string) - } } return } @@ -193,33 +174,6 @@ func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetCaptured func (c *MockProjectCommandOutputHandler_Handle_OngoingVerification) GetAllCapturedArguments() { } -func (verifier *VerifierMockProjectCommandOutputHandler) IsKeyExists(_param0 string) *MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "IsKeyExists", params, verifier.timeout) - return &MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification struct { - mock *MockProjectCommandOutputHandler - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification) GetCapturedArguments() string { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] -} - -func (c *MockProjectCommandOutputHandler_IsKeyExists_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]string, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(string) - } - } - return -} - func (verifier *VerifierMockProjectCommandOutputHandler) Register(_param0 string, _param1 chan string) *MockProjectCommandOutputHandler_Register_OngoingVerification { params := []pegomock.Param{_param0, _param1} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Register", params, verifier.timeout) @@ -251,8 +205,8 @@ func (c *MockProjectCommandOutputHandler_Register_OngoingVerification) GetAllCap return } -func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string, _param2 bool) *MockProjectCommandOutputHandler_Send_OngoingVerification { - params := []pegomock.Param{_param0, _param1, _param2} +func (verifier *VerifierMockProjectCommandOutputHandler) Send(_param0 models.ProjectCommandContext, _param1 string) *MockProjectCommandOutputHandler_Send_OngoingVerification { + params := []pegomock.Param{_param0, _param1} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Send", params, verifier.timeout) return &MockProjectCommandOutputHandler_Send_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} } @@ -262,12 +216,12 @@ type MockProjectCommandOutputHandler_Send_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string, bool) { - _param0, _param1, _param2 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetCapturedArguments() (models.ProjectCommandContext, string) { + _param0, _param1 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1] } -func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string, _param2 []bool) { +func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapturedArguments() (_param0 []models.ProjectCommandContext, _param1 []string) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { _param0 = make([]models.ProjectCommandContext, len(c.methodInvocations)) @@ -278,10 +232,6 @@ func (c *MockProjectCommandOutputHandler_Send_OngoingVerification) GetAllCapture for u, param := range params[1] { _param1[u] = param.(string) } - _param2 = make([]bool, len(c.methodInvocations)) - for u, param := range params[2] { - _param2[u] = param.(bool) - } } return } diff --git a/server/jobs/mocks/mock_storage_backend.go b/server/jobs/mocks/mock_storage_backend.go new file mode 100644 index 0000000000..ad66535c4f --- /dev/null +++ b/server/jobs/mocks/mock_storage_backend.go @@ -0,0 +1,158 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/runatlantis/atlantis/server/jobs (interfaces: StorageBackend) + +package mocks + +import ( + pegomock "github.com/petergtz/pegomock" + "reflect" + "time" +) + +type MockStorageBackend struct { + fail func(message string, callerSkip ...int) +} + +func NewMockStorageBackend(options ...pegomock.Option) *MockStorageBackend { + mock := &MockStorageBackend{} + for _, option := range options { + option.Apply(mock) + } + return mock +} + +func (mock *MockStorageBackend) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } +func (mock *MockStorageBackend) FailHandler() pegomock.FailHandler { return mock.fail } + +func (mock *MockStorageBackend) Read(_param0 string) ([]string, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockStorageBackend().") + } + params := []pegomock.Param{_param0} + result := pegomock.GetGenericMockFrom(mock).Invoke("Read", params, []reflect.Type{reflect.TypeOf((*[]string)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 []string + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].([]string) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockStorageBackend) Write(_param0 string, _param1 []string) (bool, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockStorageBackend().") + } + params := []pegomock.Param{_param0, _param1} + result := pegomock.GetGenericMockFrom(mock).Invoke("Write", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 bool + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(bool) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockStorageBackend) VerifyWasCalledOnce() *VerifierMockStorageBackend { + return &VerifierMockStorageBackend{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockStorageBackend) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockStorageBackend { + return &VerifierMockStorageBackend{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockStorageBackend) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockStorageBackend { + return &VerifierMockStorageBackend{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockStorageBackend) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockStorageBackend { + return &VerifierMockStorageBackend{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierMockStorageBackend struct { + mock *MockStorageBackend + invocationCountMatcher pegomock.InvocationCountMatcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierMockStorageBackend) Read(_param0 string) *MockStorageBackend_Read_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Read", params, verifier.timeout) + return &MockStorageBackend_Read_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockStorageBackend_Read_OngoingVerification struct { + mock *MockStorageBackend + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockStorageBackend_Read_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *MockStorageBackend_Read_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} + +func (verifier *VerifierMockStorageBackend) Write(_param0 string, _param1 []string) *MockStorageBackend_Write_OngoingVerification { + params := []pegomock.Param{_param0, _param1} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Write", params, verifier.timeout) + return &MockStorageBackend_Write_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type MockStorageBackend_Write_OngoingVerification struct { + mock *MockStorageBackend + methodInvocations []pegomock.MethodInvocation +} + +func (c *MockStorageBackend_Write_OngoingVerification) GetCapturedArguments() (string, []string) { + _param0, _param1 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1] +} + +func (c *MockStorageBackend_Write_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 [][]string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(c.methodInvocations)) + for u, param := range params[0] { + _param0[u] = param.(string) + } + _param1 = make([][]string, len(c.methodInvocations)) + for u, param := range params[1] { + _param1[u] = param.([]string) + } + } + return +} diff --git a/server/jobs/project_command_output_handler.go b/server/jobs/project_command_output_handler.go index 8a1d287b33..49fb9c4632 100644 --- a/server/jobs/project_command_output_handler.go +++ b/server/jobs/project_command_output_handler.go @@ -1,6 +1,7 @@ package jobs import ( + "fmt" "sync" "github.com/runatlantis/atlantis/server/events/models" @@ -25,72 +26,63 @@ type JobInfo struct { } type ProjectCmdOutputLine struct { - JobID string - JobInfo JobInfo - Line string - OperationComplete bool -} - -// AsyncProjectCommandOutputHandler is a handler to transport terraform client -// outputs to the front end. -type AsyncProjectCommandOutputHandler struct { - projectCmdOutput chan *ProjectCmdOutputLine - - projectOutputBuffers map[string]OutputBuffer - projectOutputBuffersLock sync.RWMutex - - receiverBuffers map[string]map[chan string]bool - receiverBuffersLock sync.RWMutex - - logger logging.SimpleLogging - - // Tracks all the jobs for a pull request which is used for clean up after a pull request is closed. - pullToJobMapping sync.Map + JobID string + JobInfo JobInfo + Line string } //go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_project_command_output_handler.go ProjectCommandOutputHandler type ProjectCommandOutputHandler interface { // Send will enqueue the msg and wait for Handle() to receive the message. - Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) + Send(ctx models.ProjectCommandContext, msg string) + + // Listens for msg from channel + Handle() // Register registers a channel and blocks until it is caught up. Callers should call this asynchronously when attempting // to read the channel in the same goroutine Register(jobID string, receiver chan string) - // Deregister removes a channel from successive updates and closes it. - Deregister(jobID string, receiver chan string) + // Cleans up resources for a pull + CleanUp(pullInfo PullInfo) - IsKeyExists(key string) bool + // Persists job to storage backend and marks operation complete + CloseJob(jobID string) +} - // Listens for msg from channel - Handle() +// AsyncProjectCommandOutputHandler is a handler to transport terraform client +// outputs to the front end. +type AsyncProjectCommandOutputHandler struct { + // Main channel that receives output from the terraform client + projectCmdOutput chan *ProjectCmdOutputLine - // Cleans up resources for a pull - CleanUp(pullInfo PullInfo) + // Storage for jobs + jobStore JobStore + + // Registry to track active connections for a job + receiverRegistry ReceiverRegistry + + // Map to track jobs in a pull request + pullToJobMapping sync.Map + logger logging.SimpleLogging } func NewAsyncProjectCommandOutputHandler( projectCmdOutput chan *ProjectCmdOutputLine, logger logging.SimpleLogging, + jobStore JobStore, ) ProjectCommandOutputHandler { return &AsyncProjectCommandOutputHandler{ - projectCmdOutput: projectCmdOutput, - logger: logger, - receiverBuffers: map[string]map[chan string]bool{}, - projectOutputBuffers: map[string]OutputBuffer{}, - pullToJobMapping: sync.Map{}, + projectCmdOutput: projectCmdOutput, + logger: logger, + pullToJobMapping: sync.Map{}, + jobStore: jobStore, + receiverRegistry: NewReceiverRegistry(), } } -func (p *AsyncProjectCommandOutputHandler) IsKeyExists(key string) bool { - p.projectOutputBuffersLock.RLock() - defer p.projectOutputBuffersLock.RUnlock() - _, ok := p.projectOutputBuffers[key] - return ok -} - -func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) { +func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { p.projectCmdOutput <- &ProjectCmdOutputLine{ JobID: ctx.JobID, JobInfo: JobInfo{ @@ -102,21 +94,12 @@ func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext Workspace: ctx.Workspace, }, }, - Line: msg, - OperationComplete: operationComplete, + Line: msg, } } -func (p *AsyncProjectCommandOutputHandler) Register(jobID string, receiver chan string) { - p.addChan(receiver, jobID) -} - func (p *AsyncProjectCommandOutputHandler) Handle() { for msg := range p.projectCmdOutput { - if msg.OperationComplete { - p.completeJob(msg.JobID) - continue - } // Add job to pullToJob mapping if _, ok := p.pullToJobMapping.Load(msg.JobInfo.PullInfo); !ok { @@ -126,99 +109,76 @@ func (p *AsyncProjectCommandOutputHandler) Handle() { jobMapping := value.(map[string]bool) jobMapping[msg.JobID] = true - // Forward new message to all receiver channels and output buffer - p.writeLogLine(msg.JobID, msg.Line) - } -} - -func (p *AsyncProjectCommandOutputHandler) completeJob(jobID string) { - p.projectOutputBuffersLock.Lock() - p.receiverBuffersLock.Lock() - defer func() { - p.projectOutputBuffersLock.Unlock() - p.receiverBuffersLock.Unlock() - }() - - // Update operation status to complete - if outputBuffer, ok := p.projectOutputBuffers[jobID]; ok { - outputBuffer.OperationComplete = true - p.projectOutputBuffers[jobID] = outputBuffer - } - - // Close active receiver channels - if openChannels, ok := p.receiverBuffers[jobID]; ok { - for ch := range openChannels { - close(ch) + // Write logs to all active connections + for ch := range p.receiverRegistry.GetReceivers(msg.JobID) { + select { + case ch <- msg.Line: + default: + p.receiverRegistry.RemoveReceiver(msg.JobID, ch) + } } - } + // Append new log to the output buffer for the job + p.jobStore.AppendOutput(msg.JobID, msg.Line) + } } -func (p *AsyncProjectCommandOutputHandler) addChan(ch chan string, jobID string) { - p.projectOutputBuffersLock.RLock() - outputBuffer := p.projectOutputBuffers[jobID] - p.projectOutputBuffersLock.RUnlock() +func (p *AsyncProjectCommandOutputHandler) Register(jobID string, connection chan string) { + job, err := p.jobStore.Get(jobID) + if err != nil { + p.logger.Err(fmt.Sprintf("getting job: %s", jobID), err) + return + } - for _, line := range outputBuffer.Buffer { - ch <- line + // Back fill contents from the output buffer + for _, line := range job.Output { + connection <- line } - // No need register receiver since all the logs have been streamed - if outputBuffer.OperationComplete { - close(ch) + // Close connection if job is complete + if job.Status == Complete { + close(connection) return } - // add the channel to our registry after we backfill the contents of the buffer, - // to prevent new messages coming in interleaving with this backfill. - p.receiverBuffersLock.Lock() - if p.receiverBuffers[jobID] == nil { - p.receiverBuffers[jobID] = map[chan string]bool{} - } - p.receiverBuffers[jobID][ch] = true - p.receiverBuffersLock.Unlock() + // add receiver to registry after backfilling contents from the buffer + p.receiverRegistry.AddReceiver(jobID, connection) } -//Add log line to buffer and send to all current channels -func (p *AsyncProjectCommandOutputHandler) writeLogLine(jobID string, line string) { - p.receiverBuffersLock.Lock() - for ch := range p.receiverBuffers[jobID] { - select { - case ch <- line: - default: - // Delete buffered channel if it's blocking. - delete(p.receiverBuffers[jobID], ch) - } - } - p.receiverBuffersLock.Unlock() +func (p *AsyncProjectCommandOutputHandler) CloseJob(jobID string) { + // Close active connections and remove receivers from registry + p.receiverRegistry.CloseAndRemoveReceiversForJob(jobID) - p.projectOutputBuffersLock.Lock() - if _, ok := p.projectOutputBuffers[jobID]; !ok { - p.projectOutputBuffers[jobID] = OutputBuffer{ - Buffer: []string{}, - } + // Update job status and persist to storage if configured + if err := p.jobStore.SetJobCompleteStatus(jobID, Complete); err != nil { + p.logger.Err("updating jobs status to complete", err) } - outputBuffer := p.projectOutputBuffers[jobID] - outputBuffer.Buffer = append(outputBuffer.Buffer, line) - p.projectOutputBuffers[jobID] = outputBuffer - - p.projectOutputBuffersLock.Unlock() } -//Remove channel, so client no longer receives Terraform output -func (p *AsyncProjectCommandOutputHandler) Deregister(jobID string, ch chan string) { - p.logger.Debug("Removing channel for %s", jobID) - p.receiverBuffersLock.Lock() - delete(p.receiverBuffers[jobID], ch) - p.receiverBuffersLock.Unlock() +func (p *AsyncProjectCommandOutputHandler) CleanUp(pullInfo PullInfo) { + if value, ok := p.pullToJobMapping.Load(pullInfo); ok { + jobMapping := value.(map[string]bool) + for jobID := range jobMapping { + // Clear output buffer for the job + p.jobStore.RemoveJob(jobID) + + // Close connections and clear registry for the job + p.receiverRegistry.CloseAndRemoveReceiversForJob(jobID) + } + + // Remove pull to job mapping for the job + p.pullToJobMapping.Delete(pullInfo) + } } +// Helper methods for testing func (p *AsyncProjectCommandOutputHandler) GetReceiverBufferForPull(jobID string) map[chan string]bool { - return p.receiverBuffers[jobID] + return p.receiverRegistry.GetReceivers(jobID) } -func (p *AsyncProjectCommandOutputHandler) GetProjectOutputBuffer(jobID string) OutputBuffer { - return p.projectOutputBuffers[jobID] +func (p *AsyncProjectCommandOutputHandler) GetJob(jobID string) Job { + job, _ := p.jobStore.Get(jobID) + return job } func (p *AsyncProjectCommandOutputHandler) GetJobIdMapForPull(pullInfo PullInfo) map[string]bool { @@ -228,39 +188,19 @@ func (p *AsyncProjectCommandOutputHandler) GetJobIdMapForPull(pullInfo PullInfo) return nil } -func (p *AsyncProjectCommandOutputHandler) CleanUp(pullInfo PullInfo) { - if value, ok := p.pullToJobMapping.Load(pullInfo); ok { - jobMapping := value.(map[string]bool) - for jobID := range jobMapping { - p.projectOutputBuffersLock.Lock() - delete(p.projectOutputBuffers, jobID) - p.projectOutputBuffersLock.Unlock() - - p.receiverBuffersLock.Lock() - delete(p.receiverBuffers, jobID) - p.receiverBuffersLock.Unlock() - } - - // Remove job mapping - p.pullToJobMapping.Delete(pullInfo) - } -} - // NoopProjectOutputHandler is a mock that doesn't do anything type NoopProjectOutputHandler struct{} -func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string, isOperationComplete bool) { +func (p *NoopProjectOutputHandler) Send(ctx models.ProjectCommandContext, msg string) { } -func (p *NoopProjectOutputHandler) Register(jobID string, receiver chan string) {} -func (p *NoopProjectOutputHandler) Deregister(jobID string, receiver chan string) {} - func (p *NoopProjectOutputHandler) Handle() { } +func (p *NoopProjectOutputHandler) Register(jobID string, receiver chan string) {} + func (p *NoopProjectOutputHandler) CleanUp(pullInfo PullInfo) { } -func (p *NoopProjectOutputHandler) IsKeyExists(key string) bool { - return false +func (p *NoopProjectOutputHandler) CloseJob(jobID string) { } diff --git a/server/jobs/project_command_output_handler_test.go b/server/jobs/project_command_output_handler_test.go index e6476960dc..e9c1d89d67 100644 --- a/server/jobs/project_command_output_handler_test.go +++ b/server/jobs/project_command_output_handler_test.go @@ -5,11 +5,15 @@ import ( "testing" "time" + . "github.com/petergtz/pegomock" + "github.com/stretchr/testify/assert" + "github.com/runatlantis/atlantis/server/events/models" "github.com/runatlantis/atlantis/server/jobs" + "github.com/runatlantis/atlantis/server/jobs/mocks" "github.com/runatlantis/atlantis/server/logging" + . "github.com/runatlantis/atlantis/testing" - "github.com/stretchr/testify/assert" ) func createTestProjectCmdContext(t *testing.T) models.ProjectCommandContext { @@ -41,19 +45,21 @@ func createTestProjectCmdContext(t *testing.T) models.ProjectCommandContext { } } -func createProjectCommandOutputHandler(t *testing.T) jobs.ProjectCommandOutputHandler { +func createProjectCommandOutputHandler(t *testing.T) (jobs.ProjectCommandOutputHandler, *mocks.MockJobStore) { logger := logging.NewNoopLogger(t) prjCmdOutputChan := make(chan *jobs.ProjectCmdOutputLine) + jobStore := mocks.NewMockJobStore() prjCmdOutputHandler := jobs.NewAsyncProjectCommandOutputHandler( prjCmdOutputChan, logger, + jobStore, ) go func() { prjCmdOutputHandler.Handle() }() - return prjCmdOutputHandler + return prjCmdOutputHandler, jobStore } func TestProjectCommandOutputHandler(t *testing.T) { @@ -63,18 +69,11 @@ func TestProjectCommandOutputHandler(t *testing.T) { t.Run("receive message from main channel", func(t *testing.T) { var wg sync.WaitGroup var expectedMsg string - projectOutputHandler := createProjectCommandOutputHandler(t) + projectOutputHandler, jobStore := createProjectCommandOutputHandler(t) + When(jobStore.Get(AnyString())).ThenReturn(jobs.Job{}, nil) ch := make(chan string) - // register channel and backfill from buffer - // Note: We call this synchronously because otherwise - // there could be a race where we are unable to register the channel - // before sending messages due to the way we lock our buffer memory cache - projectOutputHandler.Register(ctx.JobID, ch) - - wg.Add(1) - // read from channel go func() { for msg := range ch { @@ -83,7 +82,14 @@ func TestProjectCommandOutputHandler(t *testing.T) { } }() - projectOutputHandler.Send(ctx, Msg, false) + // register channel and backfill from buffer + // Note: We call this synchronously because otherwise + // there could be a race where we are unable to register the channel + // before sending messages due to the way we lock our buffer memory cache + projectOutputHandler.Register(ctx.JobID, ch) + + wg.Add(1) + projectOutputHandler.Send(ctx, Msg) wg.Wait() close(ch) @@ -93,10 +99,15 @@ func TestProjectCommandOutputHandler(t *testing.T) { t.Run("copies buffer to new channels", func(t *testing.T) { var wg sync.WaitGroup - projectOutputHandler := createProjectCommandOutputHandler(t) + projectOutputHandler, jobStore := createProjectCommandOutputHandler(t) + When(jobStore.Get(AnyString())).ThenReturn(jobs.Job{ + Output: []string{Msg}, + Status: jobs.Processing, + }, nil) - // send first message to populated the buffer - projectOutputHandler.Send(ctx, Msg, false) + // send first message to populate the buffer + projectOutputHandler.Send(ctx, Msg) + time.Sleep(10 * time.Millisecond) ch := make(chan string) @@ -114,13 +125,14 @@ func TestProjectCommandOutputHandler(t *testing.T) { } } }() + // register channel and backfill from buffer // Note: We call this synchronously because otherwise // there could be a race where we are unable to register the channel // before sending messages due to the way we lock our buffer memory cache projectOutputHandler.Register(ctx.JobID, ch) - projectOutputHandler.Send(ctx, Msg, false) + projectOutputHandler.Send(ctx, Msg) wg.Wait() close(ch) @@ -133,7 +145,8 @@ func TestProjectCommandOutputHandler(t *testing.T) { t.Run("clean up all jobs when PR is closed", func(t *testing.T) { var wg sync.WaitGroup - projectOutputHandler := createProjectCommandOutputHandler(t) + projectOutputHandler, jobStore := createProjectCommandOutputHandler(t) + When(jobStore.Get(AnyString())).ThenReturn(jobs.Job{}, nil) ch := make(chan string) @@ -154,8 +167,8 @@ func TestProjectCommandOutputHandler(t *testing.T) { } }() - projectOutputHandler.Send(ctx, Msg, false) - projectOutputHandler.Send(ctx, "Complete", false) + projectOutputHandler.Send(ctx, Msg) + projectOutputHandler.Send(ctx, "Complete") pullContext := jobs.PullInfo{ PullNum: ctx.Pull.Num, @@ -169,80 +182,35 @@ func TestProjectCommandOutputHandler(t *testing.T) { dfProjectOutputHandler, ok := projectOutputHandler.(*jobs.AsyncProjectCommandOutputHandler) assert.True(t, ok) - assert.Empty(t, dfProjectOutputHandler.GetProjectOutputBuffer(ctx.JobID)) + assert.Empty(t, dfProjectOutputHandler.GetJob(ctx.JobID).Output) assert.Empty(t, dfProjectOutputHandler.GetReceiverBufferForPull(ctx.JobID)) assert.Empty(t, dfProjectOutputHandler.GetJobIdMapForPull(pullContext)) }) - t.Run("mark operation status complete and close conn buffers for the job", func(t *testing.T) { - projectOutputHandler := createProjectCommandOutputHandler(t) + t.Run("close conn buffer after streaming logs for completed operation", func(t *testing.T) { + projectOutputHandler, jobStore := createProjectCommandOutputHandler(t) + job := jobs.Job{ + Output: []string{"a", "b"}, + Status: jobs.Complete, + } + When(jobStore.Get(AnyString())).ThenReturn(job, nil) ch := make(chan string) - // register channel and backfill from buffer - // Note: We call this synchronously because otherwise - // there could be a race where we are unable to register the channel - // before sending messages due to the way we lock our buffer memory cache - projectOutputHandler.Register(ctx.JobID, ch) - - // read from channel + opComplete := make(chan bool) + // buffer channel will be closed immediately after logs are streamed go func() { - for _ = range ch { + for range ch { } + opComplete <- true }() - projectOutputHandler.Send(ctx, Msg, false) - projectOutputHandler.Send(ctx, "", true) - - // Wait for the handler to process the message - time.Sleep(10 * time.Millisecond) - - dfProjectOutputHandler, ok := projectOutputHandler.(*jobs.AsyncProjectCommandOutputHandler) - assert.True(t, ok) - - outputBuffer := dfProjectOutputHandler.GetProjectOutputBuffer(ctx.JobID) - assert.True(t, outputBuffer.OperationComplete) - - _, ok = (<-ch) - assert.False(t, ok) - - }) - - t.Run("close conn buffer after streaming logs for completed operation", func(t *testing.T) { - projectOutputHandler := createProjectCommandOutputHandler(t) - - ch := make(chan string) - // register channel and backfill from buffer // Note: We call this synchronously because otherwise // there could be a race where we are unable to register the channel // before sending messages due to the way we lock our buffer memory cache projectOutputHandler.Register(ctx.JobID, ch) - // read from channel - go func() { - for _ = range ch { - } - }() - - projectOutputHandler.Send(ctx, Msg, false) - projectOutputHandler.Send(ctx, "", true) - - // Wait for the handler to process the message - time.Sleep(10 * time.Millisecond) - - ch_2 := make(chan string) - opComplete := make(chan bool) - - // buffer channel will be closed immediately after logs are streamed - go func() { - for _ = range ch_2 { - } - opComplete <- true - }() - - projectOutputHandler.Register(ctx.JobID, ch_2) - assert.True(t, <-opComplete) }) } diff --git a/server/jobs/receiver_registry.go b/server/jobs/receiver_registry.go new file mode 100644 index 0000000000..39601f5416 --- /dev/null +++ b/server/jobs/receiver_registry.go @@ -0,0 +1,58 @@ +package jobs + +import "sync" + +type ReceiverRegistry interface { + AddReceiver(jobID string, ch chan string) + RemoveReceiver(jobID string, ch chan string) + GetReceivers(jobID string) map[chan string]bool + CloseAndRemoveReceiversForJob(jobID string) +} + +type receiverRegistry struct { + receivers map[string]map[chan string]bool + lock sync.RWMutex +} + +func NewReceiverRegistry() *receiverRegistry { + return &receiverRegistry{ + receivers: map[string]map[chan string]bool{}, + } +} + +func (r *receiverRegistry) AddReceiver(jobID string, ch chan string) { + r.lock.Lock() + defer r.lock.Unlock() + + if r.receivers[jobID] == nil { + r.receivers[jobID] = map[chan string]bool{} + } + + r.receivers[jobID][ch] = true +} + +func (r *receiverRegistry) RemoveReceiver(jobID string, ch chan string) { + r.lock.Lock() + defer r.lock.Unlock() + + delete(r.receivers[jobID], ch) +} + +func (r *receiverRegistry) GetReceivers(jobID string) map[chan string]bool { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.receivers[jobID] +} + +func (r *receiverRegistry) CloseAndRemoveReceiversForJob(jobID string) { + r.lock.Lock() + defer r.lock.Unlock() + + for ch := range r.receivers[jobID] { + close(ch) + delete(r.receivers[jobID], ch) + } + + delete(r.receivers, jobID) +} diff --git a/server/jobs/storage_backend.go b/server/jobs/storage_backend.go new file mode 100644 index 0000000000..558be75d07 --- /dev/null +++ b/server/jobs/storage_backend.go @@ -0,0 +1,31 @@ +package jobs + +import ( + "github.com/runatlantis/atlantis/server/core/config/valid" +) + +//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_storage_backend.go StorageBackend + +type StorageBackend interface { + // Read logs from the storage backend. Must close the reader + Read(key string) ([]string, error) + + // Write logs to the storage backend + Write(key string, logs []string) (success bool, err error) +} + +func NewStorageBackend(jobs valid.Jobs) StorageBackend { + // No storage backend configured, return Noop for now + return &NoopStorageBackend{} +} + +// Used when log persistence is not configured +type NoopStorageBackend struct{} + +func (s *NoopStorageBackend) Read(key string) ([]string, error) { + return []string{}, nil +} + +func (s *NoopStorageBackend) Write(key string, logs []string) (success bool, err error) { + return false, nil +} diff --git a/server/server.go b/server/server.go index 4479e44910..2acfe02150 100644 --- a/server/server.go +++ b/server/server.go @@ -368,6 +368,10 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { Underlying: underlyingRouter, } + jobStore := jobs.NewJobStore( + jobs.NewStorageBackend(globalCfg.Jobs), + ) + var projectCmdOutputHandler jobs.ProjectCommandOutputHandler // When TFE is enabled log streaming is not necessary. @@ -378,6 +382,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { projectCmdOutputHandler = jobs.NewAsyncProjectCommandOutputHandler( projectCmdOutput, logger, + jobStore, ) } @@ -594,9 +599,9 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { } projectOutputWrapper := &events.ProjectOutputWrapper{ - JobMessageSender: projectCmdOutputHandler, ProjectCommandRunner: projectCommandRunner, JobURLSetter: jobs.NewJobURLSetter(router, commitStatusUpdater), + JobCloser: projectCmdOutputHandler, } featureAwareProjectCommandRunner := &events.FeatureAwareProjectCommandRunner{ @@ -759,6 +764,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { DeleteLockCommand: deleteLockCommand, } + // No Storage backend configured for now wsMux := websocket.NewMultiplexor( logger, controllers.JobIDKeyGenerator{},