Skip to content

Commit

Permalink
Handle WorkflowClosure from storage (flyteorg#459)
Browse files Browse the repository at this point in the history
* add data reference fields

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* combine crd parts into one object/location

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* create and pass static obj storage to propeller handler

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* read static blob at the beginning
and clear fields before updating the status

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* formatting

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* pass tests

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* remove terminated wfs' blob obj from cache

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* no return on remove blob method

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* test happy offloaded spec scenario

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* set static fields on every streak

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* formatting

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* added crdOffloadStore interface

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* made crdoffloadstore configurable

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* add metrics/formatting

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* load static workflow data outside streak loop

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* cleaned up metric reporting

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* renamed inmemory to active

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* added lruCRDOffloadStore unit tests

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* added activeCRDOffloadStore unit tests

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* added unit test for offloading crd error on handle

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* handle offloaded WorkflowClosure instead of parts of the crd

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* cache wf crd fields instead of wf closure

Signed-off-by: Babis Kiosidis <[email protected]>

* Update pkg/controller/controller.go

Co-authored-by: Ketan Umare <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* reading workflow closure directly from data store rather than cache

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* added prometheus metric

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* updated comments

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

* fixed lint issue

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>

Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Babis Kiosidis <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Co-authored-by: Babis Kiosidis <[email protected]>
Co-authored-by: Daniel Rammer <[email protected]>
Co-authored-by: Ketan Umare <[email protected]>
  • Loading branch information
4 people authored Aug 17, 2022
1 parent a1c8d74 commit e1649db
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 20 deletions.
5 changes: 5 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type FlyteWorkflow struct {
// so that it can be used downstream without any confusion.
// This field is here because it's easier to put it here than pipe through a new object through all of propeller.
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`

// WorkflowClosureReference is the location containing an offloaded WorkflowClosure. This is used to offload
// portions of the CRD to an external data store to reduce CRD size. If this exists, FlytePropeller must retrieve
// and parse the static data prior to processing.
WorkflowClosureReference DataReference `json:"workflowClosureReference,omitempty"`
}

func (in *FlyteWorkflow) GetSecurityContext() core.SecurityContext {
Expand Down
42 changes: 42 additions & 0 deletions flytepropeller/pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,45 @@ func buildConnections(w *core.CompiledWorkflow) v1alpha1.Connections {
res.Upstream = toMapOfLists(w.GetConnections().GetUpstream())
return res
}

type WfClosureCrdFields struct {
*v1alpha1.WorkflowSpec `json:"spec"`
SubWorkflows map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec `json:"subWorkflows,omitempty"`
Tasks map[v1alpha1.TaskID]*v1alpha1.TaskSpec `json:"tasks"`
}

func BuildWfClosureCrdFields(wfClosure *core.CompiledWorkflowClosure) (*WfClosureCrdFields, error) {
errs := errors.NewCompileErrors()
if wfClosure == nil {
errs.Collect(errors.NewValueRequiredErr("root", "wfClosure"))
return nil, errs
}

primarySpec, err := buildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
return nil, errs
}

for _, t := range wfClosure.Tasks {
t.Template.Interface = StripInterfaceTypeMetadata(t.Template.Interface)
}
tasks := buildTasks(wfClosure.Tasks, errs.NewScope())

subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows))
for _, subWf := range wfClosure.SubWorkflows {
spec, err := buildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
} else {
subwfs[subWf.Template.Id.String()] = spec
}
}

wfClosureCrdFields := &WfClosureCrdFields{
WorkflowSpec: primarySpec,
SubWorkflows: subwfs,
Tasks: tasks,
}
return wfClosureCrdFields, nil
}
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, err
}

handler := NewPropellerHandler(ctx, cfg, controller.workflowStore, workflowExecutor, scope)
handler := NewPropellerHandler(ctx, cfg, store, controller.workflowStore, workflowExecutor, scope)
controller.workerPool = NewWorkerPool(ctx, scope, workQ, handler)

if cfg.EnableGrpcLatencyMetrics {
Expand Down
61 changes: 55 additions & 6 deletions flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ import (
"runtime/debug"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

eventsErr "github.com/flyteorg/flytepropeller/events/errors"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"

"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/prometheus/client_golang/prometheus"
)

// TODO Lets move everything to use controller runtime
Expand All @@ -35,6 +37,7 @@ type propellerMetrics struct {
PanicObserved labeled.Counter
RoundSkipped prometheus.Counter
WorkflowNotFound prometheus.Counter
WorkflowClosureReadTime labeled.StopWatch
StreakLength labeled.Counter
RoundTime labeled.StopWatch
}
Expand All @@ -50,6 +53,7 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
PanicObserved: labeled.NewCounter("panic", "Panic during handling or aborting workflow", roundScope, labeled.EmitUnlabeledMetric),
RoundSkipped: roundScope.MustNewCounter("skipped", "Round Skipped because of stale workflow"),
WorkflowNotFound: roundScope.MustNewCounter("not_found", "workflow not found in the cache"),
WorkflowClosureReadTime: labeled.NewStopWatch("closure_read", "Total time taken to read and parse the offloaded WorkflowClosure", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric),
StreakLength: labeled.NewCounter("streak_length", "Number of consecutive rounds used in fast follow mode", roundScope, labeled.EmitUnlabeledMetric),
RoundTime: labeled.NewStopWatch("round_time", "Total time taken by one round traversing, copying and storing a workflow", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric),
}
Expand All @@ -67,6 +71,7 @@ func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWork

// Core Propeller structure that houses the Reconciliation loop for Flytepropeller
type Propeller struct {
store *storage.DataStore
wfStore workflowstore.FlyteWorkflow
workflowExecutor executors.Workflow
metrics *propellerMetrics
Expand Down Expand Up @@ -192,6 +197,32 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
return nil
}
}

// if the FlyteWorkflow CRD has the WorkflowClosureReference set then we have offloaded the
// static fields to the blobstore to reduce CRD size. we must read and parse the workflow
// closure so that these fields may be temporarily repopulated.
var wfClosureCrdFields *k8s.WfClosureCrdFields
if len(w.WorkflowClosureReference) > 0 {
t := p.metrics.WorkflowClosureReadTime.Start(ctx)

wfClosure := &admin.WorkflowClosure{}
err := p.store.ReadProtobuf(ctx, w.WorkflowClosureReference, wfClosure)
if err != nil {
t.Stop()
logger.Errorf(ctx, "Failed to retrieve workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err)
return err
}

wfClosureCrdFields, err = k8s.BuildWfClosureCrdFields(wfClosure.CompiledWorkflow)
if err != nil {
t.Stop()
logger.Errorf(ctx, "Failed to parse workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err)
return err
}

t.Stop()
}

streak := 0
defer p.metrics.StreakLength.Add(ctx, float64(streak))

Expand All @@ -201,8 +232,25 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
}

for streak = 0; streak < maxLength; streak++ {
// if the wfClosureCrdFields struct is not nil then it contains static workflow data which
// has been offloaded to the blobstore. we must set these fields so they're available
// during workflow processing and immediately remove them afterwards so they do not
// accidentally get written to the workflow store once the new state is stored.
if wfClosureCrdFields != nil {
w.WorkflowSpec = wfClosureCrdFields.WorkflowSpec
w.Tasks = wfClosureCrdFields.Tasks
w.SubWorkflows = wfClosureCrdFields.SubWorkflows
}

t := p.metrics.RoundTime.Start(ctx)
mutatedWf, err := p.TryMutateWorkflow(ctx, w)

if wfClosureCrdFields != nil {
// strip data populated from WorkflowClosureReference
w.SubWorkflows, w.Tasks, w.WorkflowSpec = nil, nil, nil
mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil
}

if err != nil {
// NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations
// We only want to increase failed attempts and discard any other partial changes to the CRD.
Expand Down Expand Up @@ -319,11 +367,12 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
}

// NewPropellerHandler creates a new Propeller and initializes metrics
func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller {
func NewPropellerHandler(_ context.Context, cfg *config.Config, store *storage.DataStore, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller {

metrics := newPropellerMetrics(scope)
return &Propeller{
metrics: metrics,
store: store,
wfStore: wfStore,
workflowExecutor: executor,
cfg: cfg,
Expand Down
100 changes: 87 additions & 13 deletions flytepropeller/pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@ import (
"testing"
"time"

"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore/mocks"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventErrors "github.com/flyteorg/flytepropeller/events/errors"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
workflowErrors "github.com/flyteorg/flytepropeller/pkg/controller/workflow/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore/mocks"

"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/storage"
storagemocks "github.com/flyteorg/flytestdlib/storage/mocks"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type mockExecutor struct {
Expand Down Expand Up @@ -50,7 +53,7 @@ func TestPropeller_Handle(t *testing.T) {
MaxWorkflowRetries: 0,
}

p := NewPropellerHandler(ctx, cfg, s, exec, scope)
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)

const namespace = "test"
const name = "123"
Expand All @@ -62,7 +65,7 @@ func TestPropeller_Handle(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrStaleWorkflowError, "stale")).Once()
assert.NoError(t, p.Handle(ctx, namespace, name))
})
Expand Down Expand Up @@ -537,7 +540,7 @@ func TestPropeller_Handle_TurboMode(t *testing.T) {
const namespace = "test"
const name = "123"

p := NewPropellerHandler(ctx, cfg, s, exec, scope)
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)

t.Run("error", func(t *testing.T) {
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
Expand Down Expand Up @@ -739,7 +742,7 @@ func TestPropellerHandler_Initialize(t *testing.T) {
MaxWorkflowRetries: 0,
}

p := NewPropellerHandler(ctx, cfg, s, exec, scope)
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)

assert.NoError(t, p.Initialize(ctx))
}
Expand All @@ -757,7 +760,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)
wf := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Expand All @@ -778,7 +781,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)
wf := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Expand All @@ -799,7 +802,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, s, exec, scope)
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)
wf := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Expand All @@ -821,3 +824,74 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) {
assert.NoError(t, err)
})
}

func TestPropellerHandler_OffloadedWorkflowClosure(t *testing.T) {
ctx := context.TODO()

const name = "123"
const namespace = "test"

s := workflowstore.NewInMemoryWorkflowStore()
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowClosureReference: "some-file-location",
}))

exec := &mockExecutor{}
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseSucceeding, "done", nil)
return nil
}

cfg := &config.Config{
MaxWorkflowRetries: 0,
}

t.Run("Happy", func(t *testing.T) {
scope := promutils.NewTestScope()

protoStore := &storagemocks.ComposedProtobufStore{}
protoStore.OnReadProtobufMatch(mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
// populate mock CompiledWorkflowClosure that satisfies just enough to compile
wfClosure := args.Get(2)
assert.NotNil(t, wfClosure)
casted := wfClosure.(*admin.WorkflowClosure)
casted.CompiledWorkflow = &core.CompiledWorkflowClosure{
Primary: &core.CompiledWorkflow{
Template: &core.WorkflowTemplate{
Id: &core.Identifier{},
},
},
}
}).Return(nil)
dataStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, protoStore)
p := NewPropellerHandler(ctx, cfg, dataStore, s, exec, scope)

assert.NoError(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 1, len(r.Finalizers))
assert.False(t, HasCompletedLabel(r))
assert.Equal(t, uint32(0), r.Status.FailedAttempts)
assert.Nil(t, r.WorkflowSpec)
assert.Nil(t, r.SubWorkflows)
assert.Nil(t, r.Tasks)
})

t.Run("Error", func(t *testing.T) {
scope := promutils.NewTestScope()

protoStore := &storagemocks.ComposedProtobufStore{}
protoStore.OnReadProtobufMatch(mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("foo"))
dataStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, protoStore)
p := NewPropellerHandler(ctx, cfg, dataStore, s, exec, scope)

err := p.Handle(ctx, namespace, name)
assert.Error(t, err)
})
}
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/workflowstore/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"

kubeerrors "k8s.io/apimachinery/pkg/api/errors"
)

Expand Down

0 comments on commit e1649db

Please sign in to comment.