Skip to content

Commit

Permalink
Pass along enhanced task events. (flyteorg#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Mar 24, 2021
1 parent 2d0e268 commit 45bd4d7
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/flyteorg/flytestdlib/cli/pflags"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/vektra/mockery/cmd/mockery"
)
4 changes: 2 additions & 2 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.24
github.com/flyteorg/flyteplugins v0.5.38
github.com/flyteorg/flyteidl v0.18.25
github.com/flyteorg/flyteplugins v0.5.39
github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
9 changes: 4 additions & 5 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.24 h1:Y4+y/tu6Qsb3jNXxuVsflycfSocfthUi6XsMgJTfGuc=
github.com/flyteorg/flyteidl v0.18.24/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8=
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.39 h1:nN8lK4SBtK3FvxSKHDiH/caNwTlb0V+DWAOIMCeFcu0=
github.com/flyteorg/flyteplugins v0.5.39/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
31 changes: 24 additions & 7 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,12 @@ func (p *pluginRequestedTransition) TransitionPreviouslyRecorded() {
p.previouslyObserved = true
}

func (p *pluginRequestedTransition) FinalTaskEvent(id *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths,
nodeExecutionMetadata handler.NodeExecutionMetadata, execContext executors.ExecutionContext) (*event.TaskExecutionEvent, error) {
func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) {
if p.previouslyObserved {
return nil, nil
}

return ToTaskExecutionEvent(id, in, out, p.pInfo, nodeExecutionMetadata, execContext)
input.Info = p.pInfo
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
Expand Down Expand Up @@ -575,11 +574,20 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "plugin transition is not observed and no error as well.")
}

execID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID()
// STEP 4: Send buffered events!
logger.Debugf(ctx, "Sending buffered Task events.")
for _, ev := range tCtx.ber.GetAll(ctx) {
evInfo, err := ToTaskExecutionEvent(&execID, nCtx.InputReader(), tCtx.ow, ev, nCtx.NodeExecutionMetadata(), nCtx.ExecutionContext())
evInfo, err := ToTaskExecutionEvent(ToTaskExecutionEventInputs{
TaskExecContext: tCtx,
InputReader: nCtx.InputReader(),
OutputWriter: tCtx.ow,
Info: ev,
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
ExecContext: nCtx.ExecutionContext(),
TaskType: ttype,
PluginID: p.GetID(),
ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(),
})
if err != nil {
return handler.UnknownTransition, err
}
Expand All @@ -593,7 +601,16 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)

// STEP 5: Send Transition events
logger.Debugf(ctx, "Sending transition event for plugin phase [%s]", pluginTrns.pInfo.Phase().String())
evInfo, err := pluginTrns.FinalTaskEvent(&execID, nCtx.InputReader(), tCtx.ow, nCtx.NodeExecutionMetadata(), nCtx.ExecutionContext())
evInfo, err := pluginTrns.FinalTaskEvent(ToTaskExecutionEventInputs{
TaskExecContext: tCtx,
InputReader: nCtx.InputReader(),
OutputWriter: tCtx.ow,
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
ExecContext: nCtx.ExecutionContext(),
TaskType: ttype,
PluginID: p.GetID(),
ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(),
})
if err != nil {
logger.Errorf(ctx, "failed to convert plugin transition to TaskExecutionEvent. Error: %s", err.Error())
return handler.UnknownTransition, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"sync"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down Expand Up @@ -72,6 +74,12 @@ type Proxy struct {
BaseResourceManager
ResourceNamespacePrefix pluginCore.ResourceNamespace
ExecutionIdentifier *core.TaskExecutionIdentifier
ResourcePoolInfo map[string]*event.ResourcePoolInfo
}

type TaskResourceManager interface {
pluginCore.ResourceManager
GetResourcePoolInfo() []*event.ResourcePoolInfo
}

func (p Proxy) ComposeResourceConstraint(spec pluginCore.ResourceConstraintsSpec) []FullyQualifiedResourceConstraint {
Expand All @@ -92,6 +100,13 @@ func (p Proxy) AllocateResource(ctx context.Context, namespace pluginCore.Resour
p.ResourceNamespacePrefix.CreateSubNamespace(namespace),
Token(allocationToken).prepend(ComposeTokenPrefix(p.ExecutionIdentifier)),
composedResourceConstraintList)
if err != nil {
return status, err
}
p.ResourcePoolInfo[allocationToken] = &event.ResourcePoolInfo{
AllocationToken: allocationToken,
Namespace: string(namespace),
}
return status, err
}

Expand All @@ -103,12 +118,21 @@ func (p Proxy) ReleaseResource(ctx context.Context, namespace pluginCore.Resourc
return err
}

func (p Proxy) GetResourcePoolInfo() []*event.ResourcePoolInfo {
response := make([]*event.ResourcePoolInfo, 0, len(p.ResourcePoolInfo))
for _, resourcePoolInfo := range p.ResourcePoolInfo {
response = append(response, resourcePoolInfo)
}
return response
}

func GetTaskResourceManager(r BaseResourceManager, resourceNamespacePrefix pluginCore.ResourceNamespace,
id *core.TaskExecutionIdentifier) pluginCore.ResourceManager {
id *core.TaskExecutionIdentifier) TaskResourceManager {
return Proxy{
BaseResourceManager: r,
ResourceNamespacePrefix: resourceNamespacePrefix,
ExecutionIdentifier: id,
ResourcePoolInfo: make(map[string]*event.ResourcePoolInfo),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package resourcemanager

import (
"context"
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
core2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
rmConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager/config"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

Expand Down Expand Up @@ -53,3 +60,18 @@ func TestToken_prepend(t *testing.T) {
})
}
}

func TestTaskResourceManager(t *testing.T) {
rmBuilder, _ := GetResourceManagerBuilderByType(context.TODO(), rmConfig.TypeNoop, promutils.NewTestScope())
rm, _ := rmBuilder.BuildResourceManager(context.TODO())
taskResourceManager := GetTaskResourceManager(rm, "namespace", &core.TaskExecutionIdentifier{})
_, err := taskResourceManager.AllocateResource(context.TODO(), "namespace", "allocation token", core2.ResourceConstraintsSpec{})
assert.NoError(t, err)
resourcePoolInfo := taskResourceManager.GetResourcePoolInfo()
assert.EqualValues(t, []*event.ResourcePoolInfo{
{
Namespace: "namespace",
AllocationToken: "allocation token",
},
}, resourcePoolInfo)
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (t taskExecutionMetadata) GetMaxAttempts() uint32 {
type taskExecutionContext struct {
handler.NodeExecutionContext
tm taskExecutionMetadata
rm pluginCore.ResourceManager
rm resourcemanager.TaskResourceManager
psm *pluginStateManager
tr handler.TaskReader
ow *ioutils.BufferedOutputWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"context"
"testing"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"

mocks2 "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -148,8 +153,19 @@ func TestHandler_newTaskExecutionContext(t *testing.T) {
assert.Equal(t, got.TaskExecutionMetadata().GetTaskExecutionID().GetID().NodeExecutionId.GetNodeId(), nodeID)
assert.Equal(t, got.TaskExecutionMetadata().GetTaskExecutionID().GetID().NodeExecutionId.GetExecutionId(), wfExecID)

assert.EqualValues(t, got.ResourceManager().(resourcemanager.TaskResourceManager).GetResourcePoolInfo(), make([]*event.ResourcePoolInfo, 0))

// TODO @kumare fix this test
assert.NotNil(t, got.ResourceManager())
assert.NotNil(t, got.rm)

_, err = got.rm.AllocateResource(context.TODO(), "foo", "token", pluginCore.ResourceConstraintsSpec{})
assert.NoError(t, err)
assert.EqualValues(t, []*event.ResourcePoolInfo{
{
Namespace: "foo",
AllocationToken: "token",
},
}, got.ResourceManager().(resourcemanager.TaskResourceManager).GetResourcePoolInfo())
assert.Nil(t, got.Catalog())
// assert.Equal(t, got.InputReader(), ir)
}
58 changes: 40 additions & 18 deletions flytepropeller/pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,55 +69,77 @@ func getParentNodeExecIDForTask(taskExecID *core.TaskExecutionIdentifier, execCo
return nodeExecutionID, nil
}

func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo,
nodeExecutionMetadata handler.NodeExecutionMetadata, execContext executors.ExecutionContext) (*event.TaskExecutionEvent, error) {
type ToTaskExecutionEventInputs struct {
TaskExecContext pluginCore.TaskExecutionContext
InputReader io.InputFilePaths
OutputWriter io.OutputFilePaths
Info pluginCore.PhaseInfo
NodeExecutionMetadata handler.NodeExecutionMetadata
ExecContext executors.ExecutionContext
TaskType string
PluginID string
ResourcePoolInfo []*event.ResourcePoolInfo
}

func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) {
// Transitions to a new phase

tm := ptypes.TimestampNow()
var err error
if i := info.Info(); i != nil && i.OccurredAt != nil {
if i := input.Info.Info(); i != nil && i.OccurredAt != nil {
tm, err = ptypes.TimestampProto(*i.OccurredAt)
if err != nil {
return nil, err
}
}

nodeExecutionID, err := getParentNodeExecIDForTask(taskExecID, execContext)
taskExecID := input.TaskExecContext.TaskExecutionMetadata().GetTaskExecutionID().GetID()
nodeExecutionID, err := getParentNodeExecIDForTask(&taskExecID, input.ExecContext)
if err != nil {
return nil, err
}
metadata := input.Info.Info().Metadata
if metadata == nil {
metadata = &event.TaskExecutionMetadata{}
}
metadata.PluginIdentifier = input.PluginID
metadata.GeneratedName = input.TaskExecContext.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
metadata.ResourcePoolInfo = input.ResourcePoolInfo
tev := &event.TaskExecutionEvent{
TaskId: taskExecID.TaskId,
ParentNodeExecutionId: nodeExecutionID,
RetryAttempt: taskExecID.RetryAttempt,
Phase: ToTaskEventPhase(info.Phase()),
PhaseVersion: info.Version(),
Phase: ToTaskEventPhase(input.Info.Phase()),
PhaseVersion: input.Info.Version(),
ProducerId: "propeller",
OccurredAt: tm,
InputUri: in.GetInputPath().String(),
InputUri: input.InputReader.GetInputPath().String(),
TaskType: input.TaskType,
Reason: input.Info.Reason(),
Metadata: metadata,
}

if info.Phase().IsSuccess() && out != nil {
if out.GetOutputPath() != "" {
tev.OutputResult = &event.TaskExecutionEvent_OutputUri{OutputUri: out.GetOutputPath().String()}
if input.Info.Phase().IsSuccess() && input.OutputWriter != nil {
if input.OutputWriter.GetOutputPath() != "" {
tev.OutputResult = &event.TaskExecutionEvent_OutputUri{OutputUri: input.OutputWriter.GetOutputPath().String()}
}
}

if info.Phase().IsFailure() && info.Err() != nil {
if input.Info.Phase().IsFailure() && input.Info.Err() != nil {
tev.OutputResult = &event.TaskExecutionEvent_Error{
Error: info.Err(),
Error: input.Info.Err(),
}
}

if info.Info() != nil {
tev.Logs = info.Info().Logs
tev.CustomInfo = info.Info().CustomInfo
if input.Info.Info() != nil {
tev.Logs = input.Info.Info().Logs
tev.CustomInfo = input.Info.Info().CustomInfo
}

if nodeExecutionMetadata.IsInterruptible() {
tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_INTERRUPTIBLE}
if input.NodeExecutionMetadata.IsInterruptible() {
tev.Metadata.InstanceClass = event.TaskExecutionMetadata_INTERRUPTIBLE
} else {
tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_DEFAULT}
tev.Metadata.InstanceClass = event.TaskExecutionMetadata_DEFAULT
}

return tev, nil
Expand Down
Loading

0 comments on commit 45bd4d7

Please sign in to comment.