Skip to content

Commit

Permalink
merge(#3419): refactored TokenStore GC with cron workflow
Browse files Browse the repository at this point in the history
Refactored TokenStore garbage collection to use cron workflow
#3419
  • Loading branch information
pregnor authored Feb 28, 2021
2 parents bebd80a + 9996cc2 commit 299b85d
Show file tree
Hide file tree
Showing 12 changed files with 1,152 additions and 18 deletions.
1 change: 0 additions & 1 deletion cmd/pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,6 @@ func main() {
base.GET("version", gin.WrapH(buildinfo.Handler(buildInfo)))

auth.Install(engine)
auth.StartTokenStoreGC(tokenStore)

enforcer := auth.NewRbacEnforcer(organizationStore, serviceAccountService, commonLogger)
authorizationMiddleware := ginauth.NewMiddleware(enforcer, basePath, errorHandler)
Expand Down
4 changes: 4 additions & 0 deletions cmd/worker/BUILD.plz
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ go_binary(
"//pkg/cadence/awssdk",
"//pkg/cluster",
"//pkg/hook",
"//pkg/sdk/cadence",
"//pkg/sdk/cadence/lib/pipeline/processlog",
"//src/auth",
"//src/auth/authdriver",
"//src/auth/workflow",
"//src/cluster",
"//src/cluster/clusteradapter",
"//src/dns",
Expand Down Expand Up @@ -213,9 +215,11 @@ go_test(
"//pkg/cluster",
"//pkg/hook",
"//pkg/mirror",
"//pkg/sdk/cadence",
"//pkg/sdk/cadence/lib/pipeline/processlog",
"//src/auth",
"//src/auth/authdriver",
"//src/auth/workflow",
"//src/cluster",
"//src/cluster/clusteradapter",
"//src/dns",
Expand Down
30 changes: 30 additions & 0 deletions cmd/worker/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright © 2021 Banzai Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
bvsdkauth "github.com/banzaicloud/bank-vaults/pkg/sdk/auth"
"go.uber.org/cadence/worker"

authworkflow "github.com/banzaicloud/pipeline/src/auth/workflow"
)

// registerCronWorkflows registers the domain specific cron workflows
func registerAuthWorkflows(worker worker.Worker, tokenStore bvsdkauth.TokenStore) (err error) {
authworkflow.NewStartTokenStoreGCActivity(tokenStore).Register(worker)
authworkflow.NewGarbageCollectTokenStoreWorkflow().Register(worker)

return nil
}
17 changes: 17 additions & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"syscall"
"text/template"
"time"

"emperror.dev/emperror"
"emperror.dev/errors"
Expand Down Expand Up @@ -110,8 +111,10 @@ import (
"github.com/banzaicloud/pipeline/pkg/cadence/awssdk"
pkgCluster "github.com/banzaicloud/pipeline/pkg/cluster"
"github.com/banzaicloud/pipeline/pkg/hook"
sdkcadence "github.com/banzaicloud/pipeline/pkg/sdk/cadence"
"github.com/banzaicloud/pipeline/src/auth"
"github.com/banzaicloud/pipeline/src/auth/authdriver"
authworkflow "github.com/banzaicloud/pipeline/src/auth/workflow"
"github.com/banzaicloud/pipeline/src/cluster"
legacyclusteradapter "github.com/banzaicloud/pipeline/src/cluster/clusteradapter"
"github.com/banzaicloud/pipeline/src/dns"
Expand Down Expand Up @@ -258,7 +261,21 @@ func main() {
clusteradapter.NewStore(db, clusterRepo),
releaseDeleter,
)

tokenStore := bauth.NewVaultTokenStore("pipeline")
err = registerAuthWorkflows(worker, tokenStore)
emperror.Panic(errors.WrapIf(err, "failed to register auth workflows"))
tokenStoreGCCronConfiguration := sdkcadence.NewCronConfiguration(
workflowClient,
sdkcadence.CronInstanceTypeDomain,
"0 0/12 * * *",
11*time.Hour,
taskList,
authworkflow.GarbageCollectTokenStoreWorkflowName,
)
err = tokenStoreGCCronConfiguration.StartCronWorkflow(context.Background())
emperror.Panic(errors.WrapIf(err, "failed to start token store garbage collection cron workflow"))

tokenManager := pkgAuth.NewTokenManager(
pkgAuth.NewJWTTokenGenerator(
config.Auth.Token.Issuer,
Expand Down
10 changes: 9 additions & 1 deletion pkg/sdk/cadence/BUILD.plz
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ go_library(
exclude = ["*_test.go"],
),
visibility = ["PUBLIC"],
deps = ["//third_party/go:go.uber.org__cadence__workflow"],
deps = [
"//third_party/go:emperror.dev__errors",
"//third_party/go:go.uber.org__cadence__.gen__go__shared",
"//third_party/go:go.uber.org__cadence__client",
"//third_party/go:go.uber.org__cadence__workflow",
],
)

go_test(
Expand All @@ -14,6 +19,9 @@ go_test(
deps = [
"//third_party/go:emperror.dev__errors",
"//third_party/go:github.com__stretchr__testify__require",
"//third_party/go:go.uber.org__cadence__.gen__go__shared",
"//third_party/go:go.uber.org__cadence__client",
"//third_party/go:go.uber.org__cadence__mocks",
"//third_party/go:go.uber.org__cadence__testsuite",
"//third_party/go:go.uber.org__cadence__workflow",
],
Expand Down
195 changes: 195 additions & 0 deletions pkg/sdk/cadence/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright © 2021 Banzai Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cadence

import (
"context"
"time"

"emperror.dev/errors"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/client"
)

// CronConfiguration encapsulates information about a cron workflow.
type CronConfiguration struct {
CadenceClient client.Client
CronInstanceType CronInstanceType
CronSchedule string
ExecutionStartToCloseTimeout time.Duration
TaskListName string
Workflow string
WorkflowArguments []interface{}
}

// NewCronConfiguration instantiates a cron configuration from the specified
// values.
func NewCronConfiguration(
cadenceClient client.Client,
cronInstanceType CronInstanceType,
cronSchedule string,
executionStartToCloseTimeout time.Duration,
taskListName string,
workflow string,
workflowArguments ...interface{},
) CronConfiguration {
return CronConfiguration{
CadenceClient: cadenceClient,
CronInstanceType: cronInstanceType,
CronSchedule: cronSchedule,
ExecutionStartToCloseTimeout: executionStartToCloseTimeout,
TaskListName: taskListName,
WorkflowArguments: workflowArguments,
Workflow: workflow,
}
}

// CronWorkflowID returns the ID of the cron workflow based on its configuration.
func (cronConfig CronConfiguration) CronWorkflowID() (cronWorkflowID string) {
switch cronConfig.CronInstanceType {
case CronInstanceTypeDomain:
return cronConfig.TaskListName + "-cron-" + cronConfig.Workflow
}

return string(cronConfig.CronInstanceType) + "-cron-" + cronConfig.Workflow
}

// StartCronWorkflow initiates a Cadence cron workflow from the specified type.
//
// WARNING: the current implementation allows only single instance crons (one
// per Cadence server).
func (cronConfig CronConfiguration) StartCronWorkflow(ctx context.Context) (err error) {
state, err := cronConfig.WorkflowState(ctx)
if err != nil {
return errors.Wrap(err, "querying workflow state failed")
}

switch state {
case CronWorkflowStateScheduled: // Note: nothing to do.
return nil
case CronWorkflowStateScheduledOutdated: // Note: restart required.
err = cronConfig.CadenceClient.TerminateWorkflow(
ctx,
cronConfig.CronWorkflowID(),
"",
"cron workflow schedule requires an update",
nil,
)
if err != nil {
return errors.Wrap(err, "terminating cron workflow failed")
}
}

cronWorkflowOptions := client.StartWorkflowOptions{
ID: cronConfig.CronWorkflowID(),
TaskList: cronConfig.TaskListName,
ExecutionStartToCloseTimeout: cronConfig.ExecutionStartToCloseTimeout,
CronSchedule: cronConfig.CronSchedule,
Memo: map[string]interface{}{ // Note: CronSchedule is not directly retrievable (version 0.13.4-0.15.0).
"CronSchedule": cronConfig.CronSchedule,
},
}

_, err = cronConfig.CadenceClient.StartWorkflow(
ctx,
cronWorkflowOptions,
cronConfig.Workflow,
cronConfig.WorkflowArguments...,
)
if err != nil {
return errors.Wrap(err, "starting cron workflow failed")
}

return nil
}

// WorkflowState queries the state of the cron workflow corresponding to the
// cron configuration.
func (cronConfig CronConfiguration) WorkflowState(ctx context.Context) (workflowState CronWorkflowState, err error) {
cronWorkflowDescription, err := cronConfig.CadenceClient.DescribeWorkflowExecution(
ctx,
cronConfig.CronWorkflowID(),
"",
)
if errors.As(err, new(*shared.EntityNotExistsError)) {
return CronWorkflowStateNotScheduled, nil
} else if err != nil {
return CronWorkflowStateUnknown, errors.Wrap(err, "failed to query cron workflow")
} else if cronWorkflowDescription == nil ||
cronWorkflowDescription.WorkflowExecutionInfo == nil {
return CronWorkflowStateUnknown, errors.New("cron workflow execution information not found")
}

executionInfo := cronWorkflowDescription.WorkflowExecutionInfo

closeStatus := executionInfo.GetCloseStatus()
// Note: https://cadenceworkflow.io/docs/go-client/distributed-cron cron
// workflows only stop when cancelled or terminated.
if closeStatus == shared.WorkflowExecutionCloseStatusCanceled ||
closeStatus == shared.WorkflowExecutionCloseStatusTerminated {
return CronWorkflowStateNotScheduled, nil
}

activeCronSchedule := ""
if executionInfo.Memo != nil &&
executionInfo.Memo.Fields["CronSchedule"] != nil {
value := client.NewValue(executionInfo.Memo.Fields["CronSchedule"])
if value.HasValue() {
err = value.Get(&activeCronSchedule)
if err != nil {
return CronWorkflowStateUnknown, errors.Wrap(err, "retrieving cron schedule failed")
}
}
}

if activeCronSchedule != cronConfig.CronSchedule {
return CronWorkflowStateScheduledOutdated, nil
}

return CronWorkflowStateScheduled, nil
}

// CronInstanceType determines how cron instances are treated in case of a
// multiworker environment.
type CronInstanceType string

const (
// CronInstanceTypeDomain specifies only one instance of the scheduled cron
// workflow can exist per Cadence domain.
CronInstanceTypeDomain CronInstanceType = "domain"
)

// CronWorkflowState describes the state of a cron workflow which can be used
// for cron workflow operations.
type CronWorkflowState string

const (
// CronWorkflowStateNotScheduled defines the state when no corresponding
// cron workflow schedule exists.
CronWorkflowStateNotScheduled CronWorkflowState = "not-scheduled"

// CronWorkflowStateScheduled defines the state when the corresponding cron
// workflow is scheduled to run with the latest known schedule.
CronWorkflowStateScheduled CronWorkflowState = "scheduled"

// CronWorkflowStateScheduledOutdated defines the state when the
// corresponding cron workflow is scheduled to run using an outdated
// schedule.
CronWorkflowStateScheduledOutdated CronWorkflowState = "scheduled-outdated"

// CronWorkflowStateUnknown defines the state when no information is
// available about the corresponding cron workflow.
CronWorkflowStateUnknown CronWorkflowState = "unknown"
)
Loading

0 comments on commit 299b85d

Please sign in to comment.