Skip to content

Commit

Permalink
Add option to send raw output data in events (flyteorg#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Aug 26, 2021
1 parent 9378444 commit 1fc56b7
Show file tree
Hide file tree
Showing 39 changed files with 1,261 additions and 151 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.19.19
github.com/flyteorg/flyteidl v0.19.22
github.com/flyteorg/flyteplugins v0.5.69
github.com/flyteorg/flytestdlib v0.3.34
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.19 h1:jv93YLz0Bq++sH9r0AOhdNaHFdXSCWjsXJoLOIduA2o=
github.com/flyteorg/flyteidl v0.19.19/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.22 h1:e3M0Dob/r5n+AJfAByzad/svMAVes7XfZVxUNCi6AeQ=
github.com/flyteorg/flyteidl v0.19.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.69 h1:i1V1n+uoI5TrBG/UWD6vzJ/fFAtru9FSYbjCnYBttUc=
github.com/flyteorg/flyteplugins v0.5.69/go.mod h1:YjahYP+i4/Qn+dFvxMOGbhDtkQT4EiH4Kb88KNK505A=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
Expand Down
19 changes: 19 additions & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ var (
MetadataPrefix: "metadata/propeller",
EnableAdminLauncher: true,
MetricsPrefix: "flyte",
EventConfig: EventConfig{
RawOutputPolicy: RawOutputPolicyReference,
},
}
)

Expand Down Expand Up @@ -133,6 +136,7 @@ type Config struct {
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."`
EventConfig EventConfig `json:"event-config,omitempty" pflag:",Configures execution event behavior."`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down Expand Up @@ -215,6 +219,21 @@ type LeaderElectionConfig struct {
RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."`
}

// Defines how output data should be passed along in execution events.
type RawOutputPolicy = string

const (
// Only send output data as a URI referencing where outputs have been uploaded
RawOutputPolicyReference RawOutputPolicy = "reference"
// Send raw output data in events.
RawOutputPolicyInline RawOutputPolicy = "inline"
)

type EventConfig struct {
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
}

// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,12 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), scope)
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), &cfg.EventConfig, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, scope)
workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, scope)
if err != nil {
return nil, err
}
Expand Down
50 changes: 50 additions & 0 deletions flytepropeller/pkg/controller/events/mocks/node_event_recorder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions flytepropeller/pkg/controller/events/mocks/task_event_recorder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions flytepropeller/pkg/controller/events/node_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package events

import (
"context"
"strings"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteidl/clients/go/events"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

//go:generate mockery -all -output=mocks -case=underscore

// Recorder for Node events
type NodeEventRecorder interface {
// Records node execution events indicating the node has undergone a phase change and additional metadata.
RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error
}

type nodeEventRecorder struct {
eventRecorder events.NodeEventRecorder
store *storage.DataStore
}

// In certain cases, a successful node execution event can be configured to include raw output data inline. However,
// for large outputs these events may exceed the event recipient's message size limit, so we fallback to passing
// the offloaded output URI instead.
func (r *nodeEventRecorder) handleFailure(ctx context.Context, ev *event.NodeExecutionEvent, err error) error {
st, ok := status.FromError(err)
if !ok || st.Code() != codes.ResourceExhausted {
// Error was not a status error
return err
}
if !strings.HasPrefix(st.Message(), "message too large") {
return err
}

// This time, we attempt to record the event with the output URI set.
return r.eventRecorder.RecordNodeEvent(ctx, ev)
}

func (r *nodeEventRecorder) RecordNodeEvent(ctx context.Context, ev *event.NodeExecutionEvent, eventConfig *config.EventConfig) error {
var origEvent = ev
var rawOutputPolicy = eventConfig.RawOutputPolicy
if rawOutputPolicy == config.RawOutputPolicyInline && len(ev.GetOutputUri()) > 0 {
outputs := &core.LiteralMap{}
err := r.store.ReadProtobuf(ctx, storage.DataReference(ev.GetOutputUri()), outputs)
if err != nil {
// Fall back to forwarding along outputs by reference when we can't fetch them.
logger.Warnf(ctx, "failed to fetch outputs by ref [%s] to send inline with err: %v", ev.GetOutputUri(), err)
rawOutputPolicy = config.RawOutputPolicyReference
} else {
origEvent = proto.Clone(ev).(*event.NodeExecutionEvent)
ev.OutputResult = &event.NodeExecutionEvent_OutputData{
OutputData: outputs,
}
}
}

err := r.eventRecorder.RecordNodeEvent(ctx, ev)
if err != nil {
logger.Infof(ctx, "Failed to record node event [%+v] with err: %v", ev, err)
// Only attempt to retry sending an event in the case we tried to send raw output data inline
if eventConfig.FallbackToOutputReference && rawOutputPolicy == config.RawOutputPolicyInline {
logger.Infof(ctx, "Falling back to sending node event outputs by reference for [%+v]", ev.Id)
return r.handleFailure(ctx, origEvent, err)
}
return err
}
return nil
}

func NewNodeEventRecorder(eventSink events.EventSink, scope promutils.Scope, store *storage.DataStore) NodeEventRecorder {
return &nodeEventRecorder{
eventRecorder: events.NewNodeEventRecorder(eventSink, scope),
store: store,
}
}
Loading

0 comments on commit 1fc56b7

Please sign in to comment.