Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Pass along enhanced task events. #244

Merged
merged 18 commits into from
Mar 24, 2021
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_support_tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/flyteorg/flytestdlib/cli/pflags"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/vektra/mockery/cmd/mockery"
)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.20
github.com/flyteorg/flyteplugins v0.5.38
github.com/flyteorg/flyteidl v0.18.25
github.com/flyteorg/flyteplugins v0.5.39-0.20210322202107-51b9713edaba
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update before merging

github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
13 changes: 8 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA=
Expand Down Expand Up @@ -230,11 +231,12 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.20 h1:OGOb2FOHWL363Qp8uzbJeFbQBKYPT30+afv+8BnBlGs=
github.com/flyteorg/flyteidl v0.18.20/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8=
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.38-0.20210322195434-e97acce4a543 h1:bRWTErtcF5llDdbg08/JJYv7Lxog1Vo+w2LWqOPCzF0=
github.com/flyteorg/flyteplugins v0.5.38-0.20210322195434-e97acce4a543/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flyteplugins v0.5.39-0.20210322202107-51b9713edaba h1:+h7TjqLY78Ht3oQj4rIOOTvAJL0BeUlLLiqpxkEFTxE=
github.com/flyteorg/flyteplugins v0.5.39-0.20210322202107-51b9713edaba/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down Expand Up @@ -1229,6 +1231,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ const (
// when handling a workflow event which might yield to seemingly random failures. This phase ensure we are handling,
// and only so, the failure node until it's done executing or it fails itself.
// If a failure node fails to execute (a real possibility), the final failure output of the workflow will only include
// its failure reason. In other words, its failure will mask the original failure for the workflow. It's imperative
// its failure reason. InputReader other words, its failure will mask the original failure for the workflow. It's imperative
// failure nodes should be very simple, very resilient and very well tested.
WorkflowPhaseHandlingFailureNode
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ type NodeStatus struct {

TaskNodeStatus *TaskNodeStatus `json:",omitempty"`
DynamicNodeStatus *DynamicNodeStatus `json:"dynamicNodeStatus,omitempty"`
// In case of Failing/Failed Phase, an execution error can be optionally associated with the Node
// InputReader case of Failing/Failed Phase, an execution error can be optionally associated with the Node
Error *ExecutionError `json:"error,omitempty"`

// Not Persisted
Expand Down
2 changes: 1 addition & 1 deletion pkg/compiler/validators/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (t collectionTypeChecker) CastsFrom(upstreamType *flyte.LiteralType) bool {
// Schemas are more complex types in the Flyte ecosystem. A schema is considered castable in the following
// cases.
//
// 1. The downstream schema has no column types specified. In such a case, it accepts all schema input since it is
// 1. The downstream schema has no column types specified. InputReader such a case, it accepts all schema input since it is
// generic.
//
// 2. The downstream schema has a subset of the upstream columns and they match perfectly.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/composite_workqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
)

// A CompositeWorkQueue can be used in cases where the work is enqueued by two sources. It can be enqueued by either
// 1. Informer for the Primary Object itself. In case of FlytePropeller, this is the workflow object
// 1. Informer for the Primary Object itself. InputReader case of FlytePropeller, this is the workflow object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended renames?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goland you're 2 clever

// 2. Informer or any other process that enqueues the top-level object for re-evaluation in response to one of the
// sub-objects being ready. In the case of FlytePropeller this is the "Node/Task" updates, will re-enqueue the workflow
// sub-objects being ready. InputReader the case of FlytePropeller this is the "Node/Task" updates, will re-enqueue the workflow
// to be re-evaluated
type CompositeWorkQueue interface {
workqueue.RateLimitingInterface
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (c *Controller) getWorkflowUpdatesHandler() cache.ResourceEventHandler {
DeleteFunc: func(obj interface{}) {
// There is a corner case where the obj is not in fact a valid resource (it sends a DeletedFinalStateUnknown
// object instead) -it has to do with missing some event that leads to not knowing the final state of the
// resource. In which case, we can't use the regular metaAccessor to read obj name/namespace but should
// resource. InputReader which case, we can't use the regular metaAccessor to read obj name/namespace but should
// instead use cache.DeletionHandling* helper functions that know how to deal with that.

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
// Env var to lookup pod name in. In pod spec, you will have to specify it like this:
// Env var to lookup pod name in. InputReader pod spec, you will have to specify it like this:
// env:
// - name: POD_NAME
// valueFrom:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const maxUniqueIDLength = 20

// The UniqueId of a node is unique within a given workflow execution.
// In order to achieve that we track the lineage of the node.
// InputReader order to achieve that we track the lineage of the node.
// To compute the uniqueID of a node, we use the uniqueID and retry attempt of the parent node
// For nodes in level 0, there is no parent, and parentInfo is nil
func GenerateUniqueID(parentInfo executors.ImmutableParentInfo, nodeID string) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return dynamicWorkflowContext{}, err
}

// TODO: In addition to querying Admin for launch plans, we also need to get all the tasks that are missing from the dynamic job spec.
// TODO: InputReader addition to querying Admin for launch plans, we also need to get all the tasks that are missing from the dynamic job spec.
// The reason they might be missing is because if a user yields a task that is SdkTask.fetch'ed, it should not be included
// See https://github.com/flyteorg/flyte/issues/219 for more information.

Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *nodeExecutor) IdempotentRecordEvent(ctx context.Context, nodeEvent *eve
return err
}

// In this method we check if the queue is ready to be processed and if so, we prime it in Admin as queued
// InputReader this method we check if the queue is ready to be processed and if so, we prime it in Admin as queued
// Before we start the node execution, we need to transition this Node status to Queued.
// This is because a node execution has to exist before task/wf executions can start.
func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructure, nCtx handler.NodeExecutionContext) (handler.PhaseInfo, error) {
Expand Down Expand Up @@ -441,7 +441,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node
return executors.NodeStatusUndefined, errors.Wrapf(errors.EventRecordingFailed, nCtx.NodeID(), err, "failed to record node event")
}

// We reach here only when transitioning from Queued to Running. In this case, the startedAt is not set.
// We reach here only when transitioning from Queued to Running. InputReader this case, the startedAt is not set.
if np == v1alpha1.NodePhaseRunning {
if nodeStatus.GetQueuedAt() != nil {
c.metrics.QueuingLatency.Observe(ctx, nodeStatus.GetQueuedAt().Time, time.Now())
Expand Down Expand Up @@ -722,7 +722,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe
return c.handleNode(currentNodeCtx, dag, nCtx, h)

// TODO we can optimize skip state handling by iterating down the graph and marking all as skipped
// Currently we treat either Skip or Success the same way. In this approach only one node will be skipped
// Currently we treat either Skip or Success the same way. InputReader this approach only one node will be skipped
// at a time. As we iterate down, further nodes will be skipped
} else if nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseSkipped {
logger.Debugf(currentNodeCtx, "Node has [%v], traversing downstream.", nodePhase)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestGetSourceFromMetadata(t *testing.T) {
},
RetryAttempt: 0,
}},
// In legacy only taskVersionKey is available
// InputReader legacy only taskVersionKey is available
{"legacy", args{datasetMd: GetDatasetMetadataForSource(&tID).KeyMap, currentID: currentTaskID}, &core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Expand Down
26 changes: 20 additions & 6 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,12 @@ func (p *pluginRequestedTransition) TransitionPreviouslyRecorded() {
p.previouslyObserved = true
}

func (p *pluginRequestedTransition) FinalTaskEvent(id *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths,
nodeExecutionMetadata handler.NodeExecutionMetadata, execContext executors.ExecutionContext) (*event.TaskExecutionEvent, error) {
func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) {
if p.previouslyObserved {
return nil, nil
}

return ToTaskExecutionEvent(id, in, out, p.pInfo, nodeExecutionMetadata, execContext)
input.Info = p.pInfo
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
Expand Down Expand Up @@ -579,7 +578,15 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
// STEP 4: Send buffered events!
logger.Debugf(ctx, "Sending buffered Task events.")
for _, ev := range tCtx.ber.GetAll(ctx) {
evInfo, err := ToTaskExecutionEvent(&execID, nCtx.InputReader(), tCtx.ow, ev, nCtx.NodeExecutionMetadata(), nCtx.ExecutionContext())
evInfo, err := ToTaskExecutionEvent(ToTaskExecutionEventInputs{
TaskExecID: &execID,
InputReader: nCtx.InputReader(),
OutputWriter: tCtx.ow,
Info: ev,
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
ExecContext: nCtx.ExecutionContext(),
TaskType: ttype,
})
if err != nil {
return handler.UnknownTransition, err
}
Expand All @@ -593,7 +600,14 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)

// STEP 5: Send Transition events
logger.Debugf(ctx, "Sending transition event for plugin phase [%s]", pluginTrns.pInfo.Phase().String())
evInfo, err := pluginTrns.FinalTaskEvent(&execID, nCtx.InputReader(), tCtx.ow, nCtx.NodeExecutionMetadata(), nCtx.ExecutionContext())
evInfo, err := pluginTrns.FinalTaskEvent(ToTaskExecutionEventInputs{
TaskExecID: &execID,
InputReader: nCtx.InputReader(),
OutputWriter: tCtx.ow,
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
ExecContext: nCtx.ExecutionContext(),
TaskType: ttype,
})
if err != nil {
logger.Errorf(ctx, "failed to convert plugin transition to TaskExecutionEvent. Error: %s", err.Error())
return handler.UnknownTransition, err
Expand Down
48 changes: 30 additions & 18 deletions pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,52 +69,64 @@ func getParentNodeExecIDForTask(taskExecID *core.TaskExecutionIdentifier, execCo
return nodeExecutionID, nil
}

func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo,
nodeExecutionMetadata handler.NodeExecutionMetadata, execContext executors.ExecutionContext) (*event.TaskExecutionEvent, error) {
type ToTaskExecutionEventInputs struct {
TaskExecID *core.TaskExecutionIdentifier
InputReader io.InputFilePaths
OutputWriter io.OutputFilePaths
Info pluginCore.PhaseInfo
NodeExecutionMetadata handler.NodeExecutionMetadata
ExecContext executors.ExecutionContext
TaskType string
}

func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) {
// Transitions to a new phase

tm := ptypes.TimestampNow()
var err error
if i := info.Info(); i != nil && i.OccurredAt != nil {
if i := input.Info.Info(); i != nil && i.OccurredAt != nil {
tm, err = ptypes.TimestampProto(*i.OccurredAt)
if err != nil {
return nil, err
}
}

nodeExecutionID, err := getParentNodeExecIDForTask(taskExecID, execContext)
nodeExecutionID, err := getParentNodeExecIDForTask(input.TaskExecID, input.ExecContext)
if err != nil {
return nil, err
}
tev := &event.TaskExecutionEvent{
TaskId: taskExecID.TaskId,
TaskId: input.TaskExecID.TaskId,
ParentNodeExecutionId: nodeExecutionID,
RetryAttempt: taskExecID.RetryAttempt,
Phase: ToTaskEventPhase(info.Phase()),
PhaseVersion: info.Version(),
RetryAttempt: input.TaskExecID.RetryAttempt,
Phase: ToTaskEventPhase(input.Info.Phase()),
PhaseVersion: input.Info.Version(),
ProducerId: "propeller",
OccurredAt: tm,
InputUri: in.GetInputPath().String(),
InputUri: input.InputReader.GetInputPath().String(),
TaskType: input.TaskType,
Reason: input.Info.Reason(),
Metadata: input.Info.Info().Metadata,
}

if info.Phase().IsSuccess() && out != nil {
if out.GetOutputPath() != "" {
tev.OutputResult = &event.TaskExecutionEvent_OutputUri{OutputUri: out.GetOutputPath().String()}
if input.Info.Phase().IsSuccess() && input.OutputWriter != nil {
if input.OutputWriter.GetOutputPath() != "" {
tev.OutputResult = &event.TaskExecutionEvent_OutputUri{OutputUri: input.OutputWriter.GetOutputPath().String()}
}
}

if info.Phase().IsFailure() && info.Err() != nil {
if input.Info.Phase().IsFailure() && input.Info.Err() != nil {
tev.OutputResult = &event.TaskExecutionEvent_Error{
Error: info.Err(),
Error: input.Info.Err(),
}
}

if info.Info() != nil {
tev.Logs = info.Info().Logs
tev.CustomInfo = info.Info().CustomInfo
if input.Info.Info() != nil {
tev.Logs = input.Info.Info().Logs
tev.CustomInfo = input.Info.Info().CustomInfo
}

if nodeExecutionMetadata.IsInterruptible() {
if input.NodeExecutionMetadata.IsInterruptible() {
tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_INTERRUPTIBLE}
} else {
tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_DEFAULT}
Expand Down
Loading