Skip to content

Commit

Permalink
Fix and re-enable resource revision cache (flyteorg#53)
Browse files Browse the repository at this point in the history
- [X] Introduce a config-based factory method to control the workflow store type.
- [X] Change WorkflowStore interface to return the committed version of the workflow object from etcd.
- [X] Change resource version caching to only occur when the etcd. update operation results in a resource version change.
- [X] Fix unit tests to simulate updating resource versions.
  • Loading branch information
EngHabu authored Jan 7, 2020
1 parent a79c03f commit c627551
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 60 deletions.
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ propeller:
capacity: 1000
kube-config: "$HOME/.kube/config"
publish-k8s-events: true
workflowStore:
policy: "ResourceVersionCache"
tasks:
task-plugins:
enabled-plugins:
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -154,3 +155,11 @@ func TestCustomState_DeepCopy(t *testing.T) {
assert.Equal(t, 1, len(*out))
})
}

func TestWorkflowStatus_Deserialize(t *testing.T) {
raw := []byte(`{"phase":0,"dataDir":"/blah/bloh.pb","attempts":0,"cached":false}`)

parsed := &NodeStatus{}
err := json.Unmarshal(raw, parsed)
assert.NoError(t, err)
}
8 changes: 7 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controller
import (
"context"

errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
stdErrs "github.com/lyft/flytestdlib/errors"

"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog"

Expand Down Expand Up @@ -289,7 +292,10 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
}
controller.workQueue = workQ

controller.workflowStore = workflowstore.NewPassthroughWorkflowStore(ctx, scope, flytepropellerClientset.FlyteworkflowV1alpha1(), flyteworkflowInformer.Lister())
controller.workflowStore, err = workflowstore.NewWorkflowStore(ctx, workflowstore.GetConfig(), flyteworkflowInformer.Lister(), flytepropellerClientset.FlyteworkflowV1alpha1(), scope)
if err != nil {
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.DefaultDeadlines, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
return p.wfStore.Update(ctx, wfDeepCopy, workflowstore.PriorityClassCritical)
_, err = p.wfStore.Update(ctx, wfDeepCopy, workflowstore.PriorityClassCritical)
return err
}

func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (c *nodeExecutor) SetInputsForStartNode(ctx context.Context, w v1alpha1.Exe
// StartNode is special. It does not have any processing step. It just takes the workflow (or subworkflow) inputs and converts to its own outputs
nodeStatus := w.GetNodeExecutionStatus(ctx, startNode.GetID())

if nodeStatus.GetDataDir() == "" {
if len(nodeStatus.GetDataDir()) == 0 {
return executors.NodeStatusUndefined, errors.Errorf(errors.IllegalStateError, startNode.GetID(), "no data-dir set, cannot store inputs")
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/controller/workflowstore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package workflowstore

import (
ctrlConfig "github.com/lyft/flytepropeller/pkg/controller/config"
)

//go:generate pflags Config --default-var=defaultConfig

type Policy string

const (
PolicyInMemory = "InMemory"
PolicyPassThrough = "PassThrough"
PolicyResourceVersionCache = "ResourceVersionCache"
)

var (
defaultConfig = &Config{
Policy: PolicyPassThrough,
}

configSection = ctrlConfig.MustRegisterSubSection("workflowStore", defaultConfig)
)

type Config struct {
Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"`
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(*cfg)
}
25 changes: 25 additions & 0 deletions pkg/controller/workflowstore/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package workflowstore

import (
"context"
"fmt"

flyteworkflowv1alpha1 "github.com/lyft/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/lyft/flytestdlib/promutils"
)

func NewWorkflowStore(ctx context.Context, cfg *Config, lister v1alpha1.FlyteWorkflowLister,
workflows flyteworkflowv1alpha1.FlyteworkflowV1alpha1Interface, scope promutils.Scope) (FlyteWorkflow, error) {

switch cfg.Policy {
case PolicyInMemory:
return NewInMemoryWorkflowStore(), nil
case PolicyPassThrough:
return NewPassthroughWorkflowStore(ctx, scope, workflows, lister), nil
case PolicyResourceVersionCache:
return NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, workflows, lister)), nil
}

return nil, fmt.Errorf("empty workflow store config")
}
6 changes: 4 additions & 2 deletions pkg/controller/workflowstore/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (

type FlyteWorkflow interface {
Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error)
UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error
Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error
UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error)
Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error)
}
14 changes: 9 additions & 5 deletions pkg/controller/workflowstore/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,26 @@ func (i *InmemoryWorkflowStore) Get(ctx context.Context, namespace, name string)
return nil, errWorkflowNotFound
}

func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error {
func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
if w != nil {
if w.Name != "" && w.Namespace != "" {
if m, ok := i.store[w.Namespace]; ok {
if _, ok := m[w.Name]; ok {
m[w.Name] = w
return nil
return w, nil
}
}
return nil

return nil, kubeerrors.NewNotFound(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), w.Name)
}
}
return kubeerrors.NewBadRequest("Workflow object with Namespace & Name is required")

return nil, kubeerrors.NewBadRequest("Workflow object with Namespace & Name is required")
}

func (i *InmemoryWorkflowStore) Update(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error {
func (i *InmemoryWorkflowStore) Update(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
return i.UpdateStatus(ctx, w, priorityClass)
}

Expand Down
23 changes: 13 additions & 10 deletions pkg/controller/workflowstore/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,50 +41,53 @@ func (p *passthroughWorkflowStore) Get(ctx context.Context, namespace, name stri
return w, nil
}

func (p *passthroughWorkflowStore) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error {
func (p *passthroughWorkflowStore) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
p.metrics.workflowUpdateCount.Inc()
// Something has changed. Lets save
logger.Debugf(ctx, "Observed FlyteWorkflow State change. [%v] -> [%v]", workflow.Status.Phase.String(), workflow.Status.Phase.String())
t := p.metrics.workflowUpdateLatency.Start()
_, err := p.wfClientSet.FlyteWorkflows(workflow.Namespace).Update(workflow)
newWF, err = p.wfClientSet.FlyteWorkflows(workflow.Namespace).Update(workflow)
if err != nil {
if kubeerrors.IsNotFound(err) {
return nil
return nil, nil
}

if kubeerrors.IsConflict(err) {
p.metrics.workflowUpdateConflictCount.Inc()
}
p.metrics.workflowUpdateFailedCount.Inc()
logger.Errorf(ctx, "Failed to update workflow status. Error [%v]", err)
return err
return nil, err
}
t.Stop()
p.metrics.workflowUpdateSuccessCount.Inc()
logger.Debugf(ctx, "Updated workflow status.")
return nil
return newWF, nil
}

func (p *passthroughWorkflowStore) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error {
func (p *passthroughWorkflowStore) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
p.metrics.workflowUpdateCount.Inc()
// Something has changed. Lets save
logger.Debugf(ctx, "Observed FlyteWorkflow Update (maybe finalizer)")
t := p.metrics.workflowUpdateLatency.Start()
_, err := p.wfClientSet.FlyteWorkflows(workflow.Namespace).Update(workflow)
newWF, err = p.wfClientSet.FlyteWorkflows(workflow.Namespace).Update(workflow)
if err != nil {
if kubeerrors.IsNotFound(err) {
return nil
return nil, nil
}
if kubeerrors.IsConflict(err) {
p.metrics.workflowUpdateConflictCount.Inc()
}
p.metrics.workflowUpdateFailedCount.Inc()
logger.Errorf(ctx, "Failed to update workflow. Error [%v]", err)
return err
return nil, err
}
t.Stop()
p.metrics.workflowUpdateSuccessCount.Inc()
logger.Debugf(ctx, "Updated workflow.")
return nil
return newWF, nil
}

func NewPassthroughWorkflowStore(_ context.Context, scope promutils.Scope, wfClient v1alpha12.FlyteworkflowV1alpha1Interface,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/workflowstore/passthrough_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestPassthroughWorkflowStore_UpdateStatus(t *testing.T) {
const namespace = "test-ns"
t.Run("notFound", func(t *testing.T) {
wf := dummyWf(namespace, "x")
err := wfStore.UpdateStatus(ctx, wf, PriorityClassCritical)
_, err := wfStore.UpdateStatus(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)
updated, err := mockClient.FlyteWorkflows(namespace).Get("x", v1.GetOptions{})
assert.Error(t, err)
Expand All @@ -119,7 +119,7 @@ func TestPassthroughWorkflowStore_UpdateStatus(t *testing.T) {
if assert.NoError(t, err) {
assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, updated.GetExecutionStatus().GetPhase())
wf.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseFailed, "")
err := wfStore.UpdateStatus(ctx, wf, PriorityClassCritical)
_, err := wfStore.UpdateStatus(ctx, wf, PriorityClassCritical)
assert.NoError(t, err)
newVal, err := n.Get("x", v1.GetOptions{})
assert.NoError(t, err)
Expand Down
73 changes: 51 additions & 22 deletions pkg/controller/workflowstore/resource_version_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import (
"fmt"
"sync"

"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
)

// TODO - optimization maybe? we can move this to predicate check, before we add it to the queue?
type resourceVersionMetrics struct {
workflowStaleCount prometheus.Counter
workflowEvictedCount prometheus.Counter
workflowStaleCount labeled.Counter
workflowEvictedCount labeled.Counter
workflowRedundantUpdatesCount labeled.Counter
}

// Simple function that covnerts the namespace and name to a string
Expand All @@ -30,23 +32,24 @@ type resourceVersionCaching struct {
lastUpdatedResourceVersionCache sync.Map
}

func (r *resourceVersionCaching) updateRevisionCache(_ context.Context, namespace, name, resourceVersion string, isTerminated bool) {
func (r *resourceVersionCaching) updateRevisionCache(ctx context.Context, namespace, name, resourceVersion string, isTerminated bool) {
if isTerminated {
r.metrics.workflowEvictedCount.Inc()
r.metrics.workflowEvictedCount.Inc(ctx)
r.lastUpdatedResourceVersionCache.Delete(resourceVersionKey(namespace, name))
} else {
r.lastUpdatedResourceVersionCache.Store(resourceVersionKey(namespace, name), resourceVersion)
}
}

func (r *resourceVersionCaching) isResourceVersionSameAsPrevious(namespace, name, resourceVersion string) bool {
func (r *resourceVersionCaching) isResourceVersionSameAsPrevious(ctx context.Context, namespace, name, resourceVersion string) bool {
if v, ok := r.lastUpdatedResourceVersionCache.Load(resourceVersionKey(namespace, name)); ok {
strV := v.(string)
if strV == resourceVersion {
r.metrics.workflowStaleCount.Inc()
r.metrics.workflowStaleCount.Inc(ctx)
return true
}
}

return false
}

Expand All @@ -55,41 +58,67 @@ func (r *resourceVersionCaching) Get(ctx context.Context, namespace, name string
if err != nil {
return nil, err
}

if w != nil {
if r.isResourceVersionSameAsPrevious(namespace, name, w.ResourceVersion) {
if r.isResourceVersionSameAsPrevious(ctx, namespace, name, w.ResourceVersion) {
return nil, errStaleWorkflowError
}
}

return w, nil
}

func (r *resourceVersionCaching) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error {
err := r.w.UpdateStatus(ctx, workflow, priorityClass)
func (r *resourceVersionCaching) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
newWF, err = r.w.UpdateStatus(ctx, workflow, priorityClass)
if err != nil {
return err
return nil, err
}

r.updateRevisionCache(ctx, workflow.Namespace, workflow.Name, workflow.ResourceVersion, workflow.Status.IsTerminated())
return nil
if newWF != nil {
// If the update succeeded AND a resource version has changed (indicating the new WF was actually changed),
// cache the old. The behavior this code is trying to accomplish is this. Normally, if the CRD has not changed,
// the code will look at the workflow at the normal frequency. As soon as something has changed, and we get
// confirmation that we have written the newer workflow to the api server, and receive a different ResourceVersion,
// we cache the old ResourceVersion number. This means that we will never process that exact version again
// (as long as the cache is up) thus saving us from things like sending duplicate events.
if newWF.ResourceVersion != workflow.ResourceVersion {
r.updateRevisionCache(ctx, workflow.Namespace, workflow.Name, workflow.ResourceVersion, workflow.Status.IsTerminated())
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}
}

return newWF, nil
}

func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) error {
err := r.w.Update(ctx, workflow, priorityClass)
func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
newWF, err = r.w.Update(ctx, workflow, priorityClass)
if err != nil {
return err
return nil, err
}

r.updateRevisionCache(ctx, workflow.Namespace, workflow.Name, workflow.ResourceVersion, workflow.Status.IsTerminated())
return nil
}
if newWF != nil {
// If the update succeeded AND a resource version has changed (indicating the new WF was actually changed),
// cache the old
if newWF.ResourceVersion != workflow.ResourceVersion {
r.updateRevisionCache(ctx, workflow.Namespace, workflow.Name, workflow.ResourceVersion, workflow.Status.IsTerminated())
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}
}

func NewResourceVersionCachingStore(ctx context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow {
return newWF, nil
}

func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow {
return &resourceVersionCaching{
w: workflowStore,
metrics: &resourceVersionMetrics{
workflowStaleCount: scope.MustNewCounter("wf_stale", "Found stale workflow in cache"),
workflowEvictedCount: scope.MustNewCounter("wf_evict", "removed workflow from resource version cache"),
workflowStaleCount: labeled.NewCounter("wf_stale", "Found stale workflow in cache", scope, labeled.EmitUnlabeledMetric),
workflowEvictedCount: labeled.NewCounter("wf_evict", "Removed workflow from resource version cache", scope, labeled.EmitUnlabeledMetric),
workflowRedundantUpdatesCount: labeled.NewCounter("wf_redundant", "Workflow Update called but ectd. detected no actual update to the workflow.", scope, labeled.EmitUnlabeledMetric),
},
lastUpdatedResourceVersionCache: sync.Map{},
}
Expand Down
Loading

0 comments on commit c627551

Please sign in to comment.