Skip to content

Commit

Permalink
Move workflowstore logic to correct abstraction (flyteorg#496)
Browse files Browse the repository at this point in the history
* Move logic to clear workflow CRD managed fields to passthrough workflowstore

Signed-off-by: Daniel Shuy <[email protected]>

* Move logic to track terminated workflows to new TerminatedTrackingStore workflowstore

Signed-off-by: Daniel Shuy <[email protected]>

Signed-off-by: Daniel Shuy <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
  • Loading branch information
daniel-shuy and hamersaw authored Oct 24, 2022
1 parent b1b1ae7 commit 26ad857
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 68 deletions.
4 changes: 3 additions & 1 deletion pkg/controller/workflowstore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
PolicyInMemory = "InMemory"
// PolicyPassThrough just calls the underlying Clientset or the shared informer cache to get or write the workflow
PolicyPassThrough = "PassThrough"
// PolicyTrackTerminated tracks terminated workflows
PolicyTrackTerminated = "TrackTerminated"
// PolicyResourceVersionCache uses the resource version on the Workflow object, to determine if the inmemory copy
// of the workflow is stale
PolicyResourceVersionCache = "ResourceVersionCache"
Expand All @@ -28,7 +30,7 @@ var (
)

// Config for Workflow access in the controller.
// Various policies are available like - InMemory, PassThrough, ResourceVersionCache
// Various policies are available like - InMemory, PassThrough, TrackTerminated, ResourceVersionCache
type Config struct {
Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"`
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/workflowstore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ func NewWorkflowStore(ctx context.Context, cfg *Config, lister v1alpha1.FlyteWor
workflowStore = NewInMemoryWorkflowStore()
case PolicyPassThrough:
workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister)
case PolicyTrackTerminated:
workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister)
workflowStore, err = NewTerminatedTrackingStore(ctx, scope, workflowStore)
case PolicyResourceVersionCache:
workflowStore, err = NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, workflows, lister))
workflowStore = NewPassthroughWorkflowStore(ctx, scope, workflows, lister)
workflowStore, err = NewTerminatedTrackingStore(ctx, scope, workflowStore)
workflowStore = NewResourceVersionCachingStore(ctx, scope, workflowStore)
}

if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/workflowstore/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func (p *passthroughWorkflowStore) UpdateStatus(ctx context.Context, workflow *v

func (p *passthroughWorkflowStore) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
// If the workflow has any managed fields setting the array to one empty ManagedField clears them in the CRD.
// FlyteWorkflow CRDs are only managed by a single FlytePropeller instance and therefore the managed fields paradigm
// does not add useful functionality. Clearing them reduces CRD size, improving etcd I/O performance.
if len(workflow.ObjectMeta.ManagedFields) > 0 {
workflow.ObjectMeta.ManagedFields = workflow.ObjectMeta.ManagedFields[:1]
workflow.ObjectMeta.ManagedFields[0] = v1.ManagedFieldsEntry{}
}

p.metrics.workflowUpdateCount.Inc()
// Something has changed. Lets save
logger.Debugf(ctx, "Observed FlyteWorkflow Update (maybe finalizer)")
Expand Down
37 changes: 2 additions & 35 deletions pkg/controller/workflowstore/resource_version_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import (

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flytestdlib/fastcheck"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TODO - optimization maybe? we can move this to predicate check, before we add it to the queue?
Expand All @@ -33,7 +30,6 @@ type resourceVersionCaching struct {
w FlyteWorkflow
metrics *resourceVersionMetrics
lastUpdatedResourceVersionCache sync.Map
terminatedFilter fastcheck.Filter
}

func (r *resourceVersionCaching) updateRevisionCache(ctx context.Context, namespace, name, resourceVersion string, isTerminated bool) {
Expand All @@ -58,13 +54,6 @@ func (r *resourceVersionCaching) isResourceVersionSameAsPrevious(ctx context.Con
}

func (r *resourceVersionCaching) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) {
// Check if the resource version key has already been stored in a terminal phase. Processing
// terminated FlyteWorkflows can occur when workflow updates are reported after a workflow
// has already completed.
if r.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name))) {
return nil, ErrWorkflowTerminated
}

w, err := r.w.Get(ctx, namespace, name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -98,25 +87,13 @@ func (r *resourceVersionCaching) UpdateStatus(ctx context.Context, workflow *v1a
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}

if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
}

func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
// If the workflow has any managed fields setting the array to one empty ManagedField clears them in the CRD.
// FlyteWorkflow CRDs are only managed by a single FlytePropeller instance and therefore the managed fields paradigm
// does not add useful functionality. Clearing them reduces CRD size, improving etcd I/O performance.
if len(workflow.ObjectMeta.ManagedFields) > 0 {
workflow.ObjectMeta.ManagedFields = workflow.ObjectMeta.ManagedFields[:1]
workflow.ObjectMeta.ManagedFields[0] = metav1.ManagedFieldsEntry{}
}

newWF, err = r.w.Update(ctx, workflow, priorityClass)
if err != nil {
return nil, err
Expand All @@ -130,21 +107,12 @@ func (r *resourceVersionCaching) Update(ctx context.Context, workflow *v1alpha1.
} else {
r.metrics.workflowRedundantUpdatesCount.Inc(ctx)
}

if newWF.GetExecutionStatus().IsTerminated() {
r.terminatedFilter.Add(ctx, []byte(resourceVersionKey(workflow.Namespace, workflow.Name)))
}
}

return newWF, nil
}

func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) {
filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter"))
if err != nil {
return nil, err
}

func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow {
return &resourceVersionCaching{
w: workflowStore,
metrics: &resourceVersionMetrics{
Expand All @@ -153,6 +121,5 @@ func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, wo
workflowRedundantUpdatesCount: labeled.NewCounter("wf_redundant", "Workflow Update called but ectd. detected no actual update to the workflow.", scope, labeled.EmitUnlabeledMetric),
},
lastUpdatedResourceVersionCache: sync.Map{},
terminatedFilter: filter,
}, nil
}
}
70 changes: 39 additions & 31 deletions pkg/controller/workflowstore/resource_version_caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
)

const (
resourceVersionCachingNamespace = "ns"
resourceVersionCachingName = "name"
)

func init() {
labeled.SetMetricKeys(contextutils.WorkflowIDKey)
}
Expand All @@ -33,24 +38,26 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)

t.Run("notFound", func(t *testing.T) {
l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
return nil, kubeerrors.NewNotFound(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), "name")
return nil, kubeerrors.NewNotFound(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), resourceVersionCachingName)
}
w, err := wfStore.Get(ctx, "ns", "name")
w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName)
assert.Error(t, err)
assert.True(t, IsNotFound(err))
assert.Nil(t, w)
})

t.Run("alreadyExists?", func(t *testing.T) {
l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
return nil, kubeerrors.NewAlreadyExists(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), "name")
return nil, kubeerrors.NewAlreadyExists(v1alpha1.Resource(v1alpha1.FlyteWorkflowKind), resourceVersionCachingName)
}
w, err := wfStore.Get(ctx, "ns", "name")
w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName)
assert.Error(t, err)
assert.Nil(t, w)
})
Expand All @@ -59,7 +66,7 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) {
l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
return nil, fmt.Errorf("error")
}
w, err := wfStore.Get(ctx, "ns", "name")
w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName)
assert.Error(t, err)
assert.Nil(t, w)
})
Expand All @@ -69,7 +76,7 @@ func TestResourceVersionCaching_Get_NotInCache(t *testing.T) {
l.GetCb = func(name string) (*v1alpha1.FlyteWorkflow, error) {
return expW, nil
}
w, err := wfStore.Get(ctx, "ns", "name")
w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName)
assert.NoError(t, err)
assert.Equal(t, expW, w)
})
Expand Down Expand Up @@ -139,15 +146,13 @@ func createFakeClientSet() *fake.Clientset {
func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {
ctx := context.TODO()

namespace := "ns"
name := "name"
resourceVersion := "r1"

mockClient := createFakeClientSet().FlyteworkflowV1alpha1()

t.Run("Stale", func(t *testing.T) {
staleName := name + ".stale"
wf := dummyWf(namespace, staleName)
staleName := resourceVersionCachingName + ".stale"
wf := dummyWf(resourceVersionCachingNamespace, staleName)
wf.ResourceVersion = resourceVersion

scope := promutils.NewTestScope()
Expand All @@ -163,27 +168,31 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {
newWf := wf.DeepCopy()
newWf.Status.Phase = v1alpha1.WorkflowPhaseSucceeding

wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, newWf, PriorityClassCritical)
assert.NoError(t, err)

w, err := wfStore.Get(ctx, namespace, staleName)
w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, staleName)
assert.Error(t, err)
assert.False(t, IsNotFound(err))
assert.True(t, IsWorkflowStale(err))
assert.Nil(t, w)
})

t.Run("Updated", func(t *testing.T) {
updatedName := name + ".updated"
wf := dummyWf(namespace, updatedName)
updatedName := resourceVersionCachingName + ".updated"
wf := dummyWf(resourceVersionCachingNamespace, updatedName)
wf.ResourceVersion = resourceVersion

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
Expand All @@ -198,7 +207,7 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {
return wf2, nil
}

w, err := wfStore.Get(ctx, namespace, updatedName)
w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, updatedName)
assert.NoError(t, err)
assert.NotNil(t, w)
assert.Equal(t, "r2", w.ResourceVersion)
Expand All @@ -207,16 +216,18 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {
// If we -mistakenly- attempted to update the store with the exact workflow object, etcd. will not bump the
// resource version. Next read operation should continue to retrieve the same instance of the object.
t.Run("NotUpdated", func(t *testing.T) {
notUpdatedName := name + ".not-updated"
wf := dummyWf(namespace, notUpdatedName)
notUpdatedName := resourceVersionCachingName + ".not-updated"
wf := dummyWf(resourceVersionCachingNamespace, notUpdatedName)
wf.ResourceVersion = resourceVersion

_, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{})
assert.NoError(t, err)

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, wf, PriorityClassCritical)
Expand All @@ -227,7 +238,7 @@ func TestResourceVersionCaching_Get_UpdateAndRead(t *testing.T) {
return wf, nil
}

w, err := wfStore.Get(ctx, namespace, notUpdatedName)
w, err := wfStore.Get(ctx, resourceVersionCachingNamespace, notUpdatedName)
assert.NoError(t, err)
assert.NotNil(t, w)
assert.Equal(t, "r1", w.ResourceVersion)
Expand All @@ -239,11 +250,9 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) {

mockClient := createFakeClientSet().FlyteworkflowV1alpha1()

namespace := "ns"
name := "name"
resourceVersion := "r1"

wf := dummyWf(namespace, name)
wf := dummyWf(resourceVersionCachingNamespace, resourceVersionCachingName)
wf.ResourceVersion = resourceVersion

_, err := mockClient.FlyteWorkflows(wf.GetNamespace()).Create(ctx, wf, v1.CreateOptions{})
Expand All @@ -254,14 +263,16 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) {

scope := promutils.NewTestScope()
l := &mockWFNamespaceLister{}
wfStore, err := NewResourceVersionCachingStore(ctx, scope, NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l}))
passthroughWfStore := NewPassthroughWorkflowStore(ctx, scope, mockClient, &mockWFLister{V: l})
trackTerminatedWfStore, err := NewTerminatedTrackingStore(ctx, scope, passthroughWfStore)
wfStore := NewResourceVersionCachingStore(ctx, scope, trackTerminatedWfStore)
assert.NoError(t, err)
// Insert a new workflow with R1
_, err = wfStore.Update(ctx, newWf, PriorityClassCritical)
assert.NoError(t, err)

rvStore := wfStore.(*resourceVersionCaching)
v, ok := rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(namespace, name))
v, ok := rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(resourceVersionCachingNamespace, resourceVersionCachingName))
assert.True(t, ok)
assert.Equal(t, resourceVersion, v.(string))

Expand All @@ -270,15 +281,12 @@ func TestResourceVersionCaching_UpdateTerminated(t *testing.T) {
_, err = wfStore.Update(ctx, wf2, PriorityClassCritical)
assert.NoError(t, err)

v, ok = rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(namespace, name))
v, ok = rvStore.lastUpdatedResourceVersionCache.Load(resourceVersionKey(resourceVersionCachingNamespace, resourceVersionCachingName))
assert.False(t, ok)
assert.Nil(t, v)

// validate that terminated workflows are not retrievable
terminated := rvStore.terminatedFilter.Contains(ctx, []byte(resourceVersionKey(namespace, name)))
assert.True(t, terminated)

terminatedWf, err := wfStore.Get(ctx, namespace, name)
terminatedWf, err := wfStore.Get(ctx, resourceVersionCachingNamespace, resourceVersionCachingName)
assert.Nil(t, terminatedWf)
assert.True(t, IsWorkflowTerminated(err))
}
Loading

0 comments on commit 26ad857

Please sign in to comment.