diff --git a/cmd/pipeline/main.go b/cmd/pipeline/main.go index db48de0ea0..7cfd6e7835 100644 --- a/cmd/pipeline/main.go +++ b/cmd/pipeline/main.go @@ -632,7 +632,6 @@ func main() { base.GET("version", gin.WrapH(buildinfo.Handler(buildInfo))) auth.Install(engine) - auth.StartTokenStoreGC(tokenStore) enforcer := auth.NewRbacEnforcer(organizationStore, serviceAccountService, commonLogger) authorizationMiddleware := ginauth.NewMiddleware(enforcer, basePath, errorHandler) diff --git a/cmd/worker/auth.go b/cmd/worker/auth.go new file mode 100644 index 0000000000..266a8c3905 --- /dev/null +++ b/cmd/worker/auth.go @@ -0,0 +1,30 @@ +// Copyright © 2021 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + bvsdkauth "github.com/banzaicloud/bank-vaults/pkg/sdk/auth" + "go.uber.org/cadence/worker" + + authworkflow "github.com/banzaicloud/pipeline/src/auth/workflow" +) + +// registerCronWorkflows registers the domain specific cron workflows +func registerAuthWorkflows(worker worker.Worker, tokenStore bvsdkauth.TokenStore) (err error) { + authworkflow.NewStartTokenStoreGCActivity(tokenStore).Register(worker) + authworkflow.NewGarbageCollectTokenStoreWorkflow().Register(worker) + + return nil +} diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 7ac587ab6a..401dc2469c 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -21,6 +21,7 @@ import ( "os" "syscall" "text/template" + "time" "emperror.dev/emperror" "emperror.dev/errors" @@ -109,8 +110,10 @@ import ( "github.com/banzaicloud/pipeline/pkg/cadence/awssdk" pkgCluster "github.com/banzaicloud/pipeline/pkg/cluster" "github.com/banzaicloud/pipeline/pkg/hook" + sdkcadence "github.com/banzaicloud/pipeline/pkg/sdk/cadence" "github.com/banzaicloud/pipeline/src/auth" "github.com/banzaicloud/pipeline/src/auth/authdriver" + authworkflow "github.com/banzaicloud/pipeline/src/auth/workflow" "github.com/banzaicloud/pipeline/src/cluster" legacyclusteradapter "github.com/banzaicloud/pipeline/src/cluster/clusteradapter" "github.com/banzaicloud/pipeline/src/dns" @@ -257,7 +260,21 @@ func main() { clusteradapter.NewStore(db, clusterRepo), releaseDeleter, ) + tokenStore := bauth.NewVaultTokenStore("pipeline") + err = registerAuthWorkflows(worker, tokenStore) + emperror.Panic(errors.WrapIf(err, "failed to register auth workflows")) + tokenStoreGCCronConfiguration := sdkcadence.NewCronConfiguration( + workflowClient, + sdkcadence.CronInstanceTypeDomain, + "0 0/12 * * *", + 11*time.Hour, + taskList, + authworkflow.GarbageCollectTokenStoreWorkflowName, + ) + err = tokenStoreGCCronConfiguration.StartCronWorkflow(context.Background()) + emperror.Panic(errors.WrapIf(err, "failed to start token store garbage collection cron workflow")) + tokenManager := pkgAuth.NewTokenManager( pkgAuth.NewJWTTokenGenerator( config.Auth.Token.Issuer, diff --git a/pkg/sdk/cadence/cron.go b/pkg/sdk/cadence/cron.go new file mode 100644 index 0000000000..80e65e8658 --- /dev/null +++ b/pkg/sdk/cadence/cron.go @@ -0,0 +1,195 @@ +// Copyright © 2021 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cadence + +import ( + "context" + "time" + + "emperror.dev/errors" + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/client" +) + +// CronConfiguration encapsulates information about a cron workflow. +type CronConfiguration struct { + CadenceClient client.Client + CronInstanceType CronInstanceType + CronSchedule string + ExecutionStartToCloseTimeout time.Duration + TaskListName string + Workflow string + WorkflowArguments []interface{} +} + +// NewCronConfiguration instantiates a cron configuration from the specified +// values. +func NewCronConfiguration( + cadenceClient client.Client, + cronInstanceType CronInstanceType, + cronSchedule string, + executionStartToCloseTimeout time.Duration, + taskListName string, + workflow string, + workflowArguments ...interface{}, +) CronConfiguration { + return CronConfiguration{ + CadenceClient: cadenceClient, + CronInstanceType: cronInstanceType, + CronSchedule: cronSchedule, + ExecutionStartToCloseTimeout: executionStartToCloseTimeout, + TaskListName: taskListName, + WorkflowArguments: workflowArguments, + Workflow: workflow, + } +} + +// CronWorkflowID returns the ID of the cron workflow based on its configuration. +func (cronConfig CronConfiguration) CronWorkflowID() (cronWorkflowID string) { + switch cronConfig.CronInstanceType { + case CronInstanceTypeDomain: + return cronConfig.TaskListName + "-cron-" + cronConfig.Workflow + } + + return string(cronConfig.CronInstanceType) + "-cron-" + cronConfig.Workflow +} + +// StartCronWorkflow initiates a Cadence cron workflow from the specified type. +// +// WARNING: the current implementation allows only single instance crons (one +// per Cadence server). +func (cronConfig CronConfiguration) StartCronWorkflow(ctx context.Context) (err error) { + state, err := cronConfig.WorkflowState(ctx) + if err != nil { + return errors.Wrap(err, "querying workflow state failed") + } + + switch state { + case CronWorkflowStateScheduled: // Note: nothing to do. + return nil + case CronWorkflowStateScheduledOutdated: // Note: restart required. + err = cronConfig.CadenceClient.TerminateWorkflow( + ctx, + cronConfig.CronWorkflowID(), + "", + "cron workflow schedule requires an update", + nil, + ) + if err != nil { + return errors.Wrap(err, "terminating cron workflow failed") + } + } + + cronWorkflowOptions := client.StartWorkflowOptions{ + ID: cronConfig.CronWorkflowID(), + TaskList: cronConfig.TaskListName, + ExecutionStartToCloseTimeout: cronConfig.ExecutionStartToCloseTimeout, + CronSchedule: cronConfig.CronSchedule, + Memo: map[string]interface{}{ // Note: CronSchedule is not directly retrievable (version 0.13.4-0.15.0). + "CronSchedule": cronConfig.CronSchedule, + }, + } + + _, err = cronConfig.CadenceClient.StartWorkflow( + ctx, + cronWorkflowOptions, + cronConfig.Workflow, + cronConfig.WorkflowArguments..., + ) + if err != nil { + return errors.Wrap(err, "starting cron workflow failed") + } + + return nil +} + +// WorkflowState queries the state of the cron workflow corresponding to the +// cron configuration. +func (cronConfig CronConfiguration) WorkflowState(ctx context.Context) (workflowState CronWorkflowState, err error) { + cronWorkflowDescription, err := cronConfig.CadenceClient.DescribeWorkflowExecution( + ctx, + cronConfig.CronWorkflowID(), + "", + ) + if errors.As(err, new(*shared.EntityNotExistsError)) { + return CronWorkflowStateNotScheduled, nil + } else if err != nil { + return CronWorkflowStateUnknown, errors.Wrap(err, "failed to query cron workflow") + } else if cronWorkflowDescription == nil || + cronWorkflowDescription.WorkflowExecutionInfo == nil { + return CronWorkflowStateUnknown, errors.New("cron workflow execution information not found") + } + + executionInfo := cronWorkflowDescription.WorkflowExecutionInfo + + closeStatus := executionInfo.GetCloseStatus() + // Note: https://cadenceworkflow.io/docs/go-client/distributed-cron cron + // workflows only stop when cancelled or terminated. + if closeStatus == shared.WorkflowExecutionCloseStatusCanceled || + closeStatus == shared.WorkflowExecutionCloseStatusTerminated { + return CronWorkflowStateNotScheduled, nil + } + + activeCronSchedule := "" + if executionInfo.Memo != nil && + executionInfo.Memo.Fields["CronSchedule"] != nil { + value := client.NewValue(executionInfo.Memo.Fields["CronSchedule"]) + if value.HasValue() { + err = value.Get(&activeCronSchedule) + if err != nil { + return CronWorkflowStateUnknown, errors.Wrap(err, "retrieving cron schedule failed") + } + } + } + + if activeCronSchedule != cronConfig.CronSchedule { + return CronWorkflowStateScheduledOutdated, nil + } + + return CronWorkflowStateScheduled, nil +} + +// CronInstanceType determines how cron instances are treated in case of a +// multiworker environment. +type CronInstanceType string + +const ( + // CronInstanceTypeDomain specifies only one instance of the scheduled cron + // workflow can exist per Cadence domain. + CronInstanceTypeDomain CronInstanceType = "domain" +) + +// CronWorkflowState describes the state of a cron workflow which can be used +// for cron workflow operations. +type CronWorkflowState string + +const ( + // CronWorkflowStateNotScheduled defines the state when no corresponding + // cron workflow schedule exists. + CronWorkflowStateNotScheduled CronWorkflowState = "not-scheduled" + + // CronWorkflowStateScheduled defines the state when the corresponding cron + // workflow is scheduled to run with the latest known schedule. + CronWorkflowStateScheduled CronWorkflowState = "scheduled" + + // CronWorkflowStateScheduledOutdated defines the state when the + // corresponding cron workflow is scheduled to run using an outdated + // schedule. + CronWorkflowStateScheduledOutdated CronWorkflowState = "scheduled-outdated" + + // CronWorkflowStateUnknown defines the state when no information is + // available about the corresponding cron workflow. + CronWorkflowStateUnknown CronWorkflowState = "unknown" +) diff --git a/pkg/sdk/cadence/cron_test.go b/pkg/sdk/cadence/cron_test.go new file mode 100644 index 0000000000..af375c1cf1 --- /dev/null +++ b/pkg/sdk/cadence/cron_test.go @@ -0,0 +1,734 @@ +// Copyright © 2021 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cadence + +import ( + "context" + "testing" + "time" + + "emperror.dev/errors" + "github.com/stretchr/testify/require" + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/client" + "go.uber.org/cadence/mocks" + "go.uber.org/cadence/workflow" +) + +func TestNewCronConfiguration(t *testing.T) { + type inputType struct { + CadenceClient client.Client + CronInstanceType CronInstanceType + CronSchedule string + ExecutionStartToCloseTimeout time.Duration + TaskListName string + Workflow string + WorkflowArguments []interface{} + } + + type caseType struct { + caseDescription string + expectedCronConfiguration CronConfiguration + input inputType + } + + testCases := []caseType{ + { + caseDescription: "non-zero values -> non-zero configuration", + expectedCronConfiguration: CronConfiguration{ + CadenceClient: &mocks.Client{}, + CronInstanceType: CronInstanceTypeDomain, + CronSchedule: "0/1 * * * *", + ExecutionStartToCloseTimeout: time.Second, + TaskListName: "task-list-name", + Workflow: "workflow", + WorkflowArguments: []interface{}{ + false, + 1, + "2", + }, + }, + input: inputType{ + CadenceClient: &mocks.Client{}, + CronInstanceType: CronInstanceTypeDomain, + CronSchedule: "0/1 * * * *", + ExecutionStartToCloseTimeout: time.Second, + TaskListName: "task-list-name", + Workflow: "workflow", + WorkflowArguments: []interface{}{ + false, + 1, + "2", + }, + }, + }, + { + caseDescription: "zero values -> zero configuration", + expectedCronConfiguration: CronConfiguration{}, + input: inputType{}, + }, + } + + for _, testCase := range testCases { + testCase := testCase + + t.Run(testCase.caseDescription, func(t *testing.T) { + actualCronConfiguration := NewCronConfiguration( + testCase.input.CadenceClient, + testCase.input.CronInstanceType, + testCase.input.CronSchedule, + testCase.input.ExecutionStartToCloseTimeout, + testCase.input.TaskListName, + testCase.input.Workflow, + testCase.input.WorkflowArguments..., + ) + + require.Equal(t, testCase.expectedCronConfiguration, actualCronConfiguration) + }) + } +} + +func TestCronConfigurationCronWorkflowID(t *testing.T) { + type caseType struct { + caseDescription string + expectedCronWorkflowID string + inputCronConfiguration CronConfiguration + } + + testCases := []caseType{ + { + caseDescription: "domain cron instance type -> task list cron workflow id", + expectedCronWorkflowID: "task-list-name-cron-workflow", + inputCronConfiguration: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + }, + { + caseDescription: "unknown cron instance type -> instance type cron workflow id", + expectedCronWorkflowID: "unknown-instance-type-cron-workflow", + inputCronConfiguration: NewCronConfiguration( + &mocks.Client{}, + CronInstanceType("unknown-instance-type"), + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + }, + } + + for _, testCase := range testCases { + testCase := testCase + + t.Run(testCase.caseDescription, func(t *testing.T) { + actualCronWorkflowID := testCase.inputCronConfiguration.CronWorkflowID() + + require.Equal(t, testCase.expectedCronWorkflowID, actualCronWorkflowID) + }) + } +} + +func TestCronConfigurationStartCronWorkflow(t *testing.T) { + type inputType struct { + cronConfig CronConfiguration + ctx context.Context + } + + type mocksType struct { + workflowExecutionDescription *shared.DescribeWorkflowExecutionResponse + workflowExecutionDescriptionError error + workflowTerminationError error + workflowStartError error + } + + type caseType struct { + caseDescription string + expectedError error + input inputType + mocks mocksType + } + + testCases := []caseType{ + { + caseDescription: "state error -> error", + expectedError: errors.New( + "querying workflow state failed" + + ": failed to query cron workflow" + + ": test error", + ), + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: nil, + workflowExecutionDescriptionError: errors.New("test error"), + }, + }, + { + caseDescription: "scheduled state -> nothing to do", + expectedError: nil, + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{ + "CronSchedule": []byte("\"0/1 * * * *\""), + }, + }, + }, + }, + workflowExecutionDescriptionError: nil, + }, + }, + { + caseDescription: "scheduled outdated with termination error -> error", + expectedError: errors.New("terminating cron workflow failed: test workflow termination error"), + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{ + "CronSchedule": []byte("\"* 0/2 * * *\""), + }, + }, + }, + }, + workflowExecutionDescriptionError: nil, + workflowTerminationError: errors.New("test workflow termination error"), + }, + }, + { + caseDescription: "scheduled outdated with start error -> error", + expectedError: errors.New("starting cron workflow failed: test workflow start error"), + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{ + "CronSchedule": []byte("\"* 0/2 * * *\""), + }, + }, + }, + }, + workflowExecutionDescriptionError: nil, + workflowStartError: errors.New("test workflow start error"), + }, + }, + { + caseDescription: "scheduled outdated no error -> success", + expectedError: nil, + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{ + "CronSchedule": []byte("\"* 0/2 * * *\""), + }, + }, + }, + }, + workflowExecutionDescriptionError: nil, + }, + }, + { + caseDescription: "not existing start error -> error", + expectedError: errors.New("starting cron workflow failed: test workflow start error"), + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: nil, + workflowExecutionDescriptionError: &shared.EntityNotExistsError{}, + workflowStartError: errors.New("test workflow start error"), + }, + }, + { + caseDescription: "not existing no error -> success", + expectedError: nil, + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: nil, + workflowExecutionDescriptionError: &shared.EntityNotExistsError{}, + }, + }, + } + + for _, testCase := range testCases { + testCase := testCase + + t.Run(testCase.caseDescription, func(t *testing.T) { + cronWorkflowID := testCase.input.cronConfig.CronWorkflowID() + cadenceClientMock := testCase.input.cronConfig.CadenceClient.(*mocks.Client) + cadenceClientMock.On("DescribeWorkflowExecution", testCase.input.ctx, cronWorkflowID, ""). + Return(testCase.mocks.workflowExecutionDescription, testCase.mocks.workflowExecutionDescriptionError). + Once() + + activeCronSchedule := "" + if testCase.mocks.workflowExecutionDescriptionError == nil { + executionInfo := testCase.mocks.workflowExecutionDescription.WorkflowExecutionInfo + err := client.NewValue(executionInfo.Memo.Fields["CronSchedule"]).Get(&activeCronSchedule) + require.NoError(t, err) + } + + if (testCase.mocks.workflowExecutionDescriptionError == nil || + errors.As(testCase.mocks.workflowExecutionDescriptionError, new(*shared.EntityNotExistsError))) && + activeCronSchedule != testCase.input.cronConfig.CronSchedule { + if activeCronSchedule != "" { + cadenceClientMock.On( + "TerminateWorkflow", + testCase.input.ctx, + cronWorkflowID, + "", + "cron workflow schedule requires an update", + ([]byte)(nil), + ).Return(testCase.mocks.workflowTerminationError).Once() + } + + if testCase.mocks.workflowTerminationError == nil { + workflowOptions := client.StartWorkflowOptions{ + ID: cronWorkflowID, + TaskList: testCase.input.cronConfig.TaskListName, + ExecutionStartToCloseTimeout: testCase.input.cronConfig.ExecutionStartToCloseTimeout, + CronSchedule: testCase.input.cronConfig.CronSchedule, + Memo: map[string]interface{}{ // Note: CronSchedule is not directly retrievable (version 0.13.4-0.15.0). + "CronSchedule": testCase.input.cronConfig.CronSchedule, + }, + } + cadenceClientMock.On( + "StartWorkflow", + append( + []interface{}{ + testCase.input.ctx, + workflowOptions, + testCase.input.cronConfig.Workflow, + }, + testCase.input.cronConfig.WorkflowArguments..., + )..., + ).Return(&workflow.Execution{}, testCase.mocks.workflowStartError).Once() + } + } + + actualError := testCase.input.cronConfig.StartCronWorkflow(testCase.input.ctx) + + cadenceClientMock.AssertExpectations(t) + + if testCase.expectedError == nil { + require.NoError(t, actualError) + } else { + require.EqualError(t, actualError, testCase.expectedError.Error()) + } + }) + } +} + +func TestCronConfigurationWorkflowState(t *testing.T) { + type inputType struct { + cronConfig CronConfiguration + ctx context.Context + } + + type mocksType struct { + workflowExecutionDescription *shared.DescribeWorkflowExecutionResponse + workflowExecutionDescriptionError error + } + + type outputType struct { + expectedWorkflowState CronWorkflowState + expectedError error + } + + type caseType struct { + caseDescription string + input inputType + mocks mocksType + output outputType + } + + testCases := []caseType{ + { + caseDescription: "not existing -> not scheduled state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: nil, + workflowExecutionDescriptionError: &shared.EntityNotExistsError{}, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateNotScheduled, + expectedError: nil, + }, + }, + { + caseDescription: "description error -> unknown state with error", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: nil, + workflowExecutionDescriptionError: errors.New("test error"), + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateUnknown, + expectedError: errors.New("failed to query cron workflow: test error"), + }, + }, + { + caseDescription: "nil workflow execution info -> unknown state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: nil, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateUnknown, + expectedError: errors.New("cron workflow execution information not found"), + }, + }, + { + caseDescription: "canceled workflow -> not scheduled state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + CloseStatus: func() *shared.WorkflowExecutionCloseStatus { + closeStatus := shared.WorkflowExecutionCloseStatusCanceled + + return &closeStatus + }(), + }, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateNotScheduled, + expectedError: nil, + }, + }, + { + caseDescription: "terminated workflow -> not scheduled state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + CloseStatus: func() *shared.WorkflowExecutionCloseStatus { + closeStatus := shared.WorkflowExecutionCloseStatusTerminated + + return &closeStatus + }(), + }, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateNotScheduled, + expectedError: nil, + }, + }, + { + caseDescription: "no memo -> scheduled outdated state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: nil, + }, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateScheduledOutdated, + expectedError: nil, + }, + }, + { + caseDescription: "no cron schedule -> scheduled outdated state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{}, + }, + }, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateScheduledOutdated, + expectedError: nil, + }, + }, + { + caseDescription: "invalid cron schedule -> unknown state with error", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{ + "CronSchedule": []byte("not-a-valid-cron-schedule"), + }, + }, + }, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateUnknown, + expectedError: errors.New( + "retrieving cron schedule failed" + + ": unable to decode argument: 0, *string, with json error" + + ": invalid character 'o' in literal null (expecting 'u')"), + }, + }, + { + caseDescription: "mismatching cron schedule -> scheduled outdated state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{ + "CronSchedule": []byte("\"* 0/2 * * *\""), + }, + }, + }, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateScheduledOutdated, + expectedError: nil, + }, + }, + { + caseDescription: "matching cron schedule -> scheduled state", + input: inputType{ + cronConfig: NewCronConfiguration( + &mocks.Client{}, + CronInstanceTypeDomain, + "0/1 * * * *", + time.Second, + "task-list-name", + "workflow", + ), + ctx: context.Background(), + }, + mocks: mocksType{ + workflowExecutionDescription: &shared.DescribeWorkflowExecutionResponse{ + WorkflowExecutionInfo: &shared.WorkflowExecutionInfo{ + Memo: &shared.Memo{ + Fields: map[string][]byte{ + "CronSchedule": []byte("\"0/1 * * * *\""), + }, + }, + }, + }, + workflowExecutionDescriptionError: nil, + }, + output: outputType{ + expectedWorkflowState: CronWorkflowStateScheduled, + expectedError: nil, + }, + }, + } + + for _, testCase := range testCases { + testCase := testCase + + t.Run(testCase.caseDescription, func(t *testing.T) { + cronWorkflowID := testCase.input.cronConfig.CronWorkflowID() + cadenceClientMock := testCase.input.cronConfig.CadenceClient.(*mocks.Client) + cadenceClientMock.On("DescribeWorkflowExecution", testCase.input.ctx, cronWorkflowID, ""). + Return(testCase.mocks.workflowExecutionDescription, testCase.mocks.workflowExecutionDescriptionError). + Once() + + actualWorkflowState, actualError := testCase.input.cronConfig.WorkflowState(testCase.input.ctx) + + cadenceClientMock.AssertExpectations(t) + + if testCase.output.expectedError == nil { + require.NoError(t, actualError) + } else { + require.EqualError(t, actualError, testCase.output.expectedError.Error()) + } + require.Equal(t, testCase.output.expectedWorkflowState, actualWorkflowState) + }) + } +} diff --git a/src/auth/authn.go b/src/auth/authn.go index ccb0c196f2..be2a2470cd 100644 --- a/src/auth/authn.go +++ b/src/auth/authn.go @@ -219,21 +219,6 @@ func SyncOrgsForUser( return organizationSyncer.SyncOrganizations(request.Context(), *user, idTokenClaims) } -func StartTokenStoreGC(tokenStore bauth.TokenStore) { - ticker := time.NewTicker(time.Hour * 12) - go func() { - for tick := range ticker.C { - _ = tick - err := tokenStore.GC() - if err != nil { - errorHandler.Handle(errors.Wrap(err, "failed to garbage collect TokenStore")) - } else { - log.Info("TokenStore garbage collected") - } - } - }() -} - // Install the whole OAuth and JWT Token based authn/authz mechanism to the specified Gin Engine. func Install(engine *gin.Engine) { authHandler := Auth.HandlerFunc() diff --git a/src/auth/workflow/garbage_collect_token_store_workflow.go b/src/auth/workflow/garbage_collect_token_store_workflow.go new file mode 100644 index 0000000000..59f65437fc --- /dev/null +++ b/src/auth/workflow/garbage_collect_token_store_workflow.go @@ -0,0 +1,74 @@ +// Copyright © 2021 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workflow + +import ( + "time" + + "go.uber.org/cadence" + "go.uber.org/cadence/workflow" + + "github.com/banzaicloud/pipeline/pkg/cadence/worker" +) + +// GarbageCollectTokenStoreWorkflowName is the name of the token store garbage +// collection workflow. +const GarbageCollectTokenStoreWorkflowName = "auth-garbage-collect-token-store" + +// GarbageCollectTokenStoreWorkflow defines a Cadence workflow encapsulating +// high level input-independent components required to garbage collect the token +// store. +type GarbageCollectTokenStoreWorkflow struct{} + +// NewGarbageCollectTokenStoreWorkflow instantiates a token store garbage +// collection workflow. +func NewGarbageCollectTokenStoreWorkflow() *GarbageCollectTokenStoreWorkflow { + return &GarbageCollectTokenStoreWorkflow{} +} + +// Execute runs the workflow. +func (w GarbageCollectTokenStoreWorkflow) Execute(ctx workflow.Context) (err error) { + logger := workflow.GetLogger(ctx) + + activityContext := workflow.WithActivityOptions( + ctx, + workflow.ActivityOptions{ + ScheduleToStartTimeout: 10 * time.Minute, + StartToCloseTimeout: time.Hour, + WaitForCancellation: true, + RetryPolicy: &cadence.RetryPolicy{ + InitialInterval: time.Hour, + BackoffCoefficient: 1.0, + ExpirationInterval: 6 * time.Hour, + MaximumAttempts: 3, + NonRetriableErrorReasons: []string{"cadenceInternal:Panic"}, + }, + }, + ) + + err = startTokenStoreGC(activityContext) + if err != nil { + return err + } + + logger.Info("TokenStore garbage collected") + + return nil +} + +// Register registers the workflow in the worker. +func (w GarbageCollectTokenStoreWorkflow) Register(worker worker.Registry) { + worker.RegisterWorkflowWithOptions(w.Execute, workflow.RegisterOptions{Name: GarbageCollectTokenStoreWorkflowName}) +} diff --git a/src/auth/workflow/start_token_store_gc_workflow.go b/src/auth/workflow/start_token_store_gc_workflow.go new file mode 100644 index 0000000000..381e87b9f5 --- /dev/null +++ b/src/auth/workflow/start_token_store_gc_workflow.go @@ -0,0 +1,75 @@ +// Copyright © 2021 Banzai Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workflow + +import ( + "context" + + "emperror.dev/errors" + bvsdkauth "github.com/banzaicloud/bank-vaults/pkg/sdk/auth" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + + "github.com/banzaicloud/pipeline/pkg/cadence/worker" +) + +// StartTokenStoreGCActivityName is the name of the token store garbage +// collection starter activity. +const StartTokenStoreGCActivityName = "auth-start-token-store-gc" + +// StartTokenStoreGCActivity collects the necessary component dependencies +// for executing a token store garbage collection starting operation. +type StartTokenStoreGCActivity struct { + tokenStore bvsdkauth.TokenStore +} + +// NewStartTokenStoreGCActivity instantiates a activity object for starting the +// token store garbage collection. +func NewStartTokenStoreGCActivity(tokenStore bvsdkauth.TokenStore) *StartTokenStoreGCActivity { + return &StartTokenStoreGCActivity{ + tokenStore: tokenStore, + } +} + +// Execute executes a token store garbage collection starting operation. +func (a *StartTokenStoreGCActivity) Execute(ctx context.Context) (err error) { + err = a.tokenStore.GC() + if err != nil { + return errors.Wrap(err, "failed to garbage collect TokenStore") + } + + return nil +} + +// Register registers the token store garbage collection starting activity. +func (a StartTokenStoreGCActivity) Register(worker worker.Registry) { + worker.RegisterActivityWithOptions(a.Execute, activity.RegisterOptions{Name: StartTokenStoreGCActivityName}) +} + +// startTokenStoreGC starts the token store garbage collection and returns an +// error if any occurs. +// +// This is a convenience wrapper around the corresponding activity. +func startTokenStoreGC(ctx workflow.Context) error { + return startTokenStoreGCAsync(ctx).Get(ctx, nil) +} + +// startTokenStoreGCAsync returns a future object for starting the token store +// garbage collection. +// +// This is a convenience wrapper around the corresponding activity. +func startTokenStoreGCAsync(ctx workflow.Context) workflow.Future { + return workflow.ExecuteActivity(ctx, StartTokenStoreGCActivityName) +}