Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Pass along enhanced task events. #244

Merged
merged 18 commits into from
Mar 24, 2021
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_support_tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/flyteorg/flytestdlib/cli/pflags"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/vektra/mockery/cmd/mockery"
)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.24
github.com/flyteorg/flyteplugins v0.5.38
github.com/flyteorg/flyteidl v0.18.25
github.com/flyteorg/flyteplugins v0.5.39
github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.17/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.24 h1:Y4+y/tu6Qsb3jNXxuVsflycfSocfthUi6XsMgJTfGuc=
github.com/flyteorg/flyteidl v0.18.24/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.38 h1:xAQ1J23cRxzwNDgzbmRuuvflq2PFetntRCjuM5RBfTw=
github.com/flyteorg/flyteplugins v0.5.38/go.mod h1:CxerBGWWEmNYmPxSMHnwQEr9cc1Fbo/g5fcABazU6Jo=
github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8=
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.39 h1:nN8lK4SBtK3FvxSKHDiH/caNwTlb0V+DWAOIMCeFcu0=
github.com/flyteorg/flyteplugins v0.5.39/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
31 changes: 24 additions & 7 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,12 @@ func (p *pluginRequestedTransition) TransitionPreviouslyRecorded() {
p.previouslyObserved = true
}

func (p *pluginRequestedTransition) FinalTaskEvent(id *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths,
nodeExecutionMetadata handler.NodeExecutionMetadata, execContext executors.ExecutionContext) (*event.TaskExecutionEvent, error) {
func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) {
if p.previouslyObserved {
return nil, nil
}

return ToTaskExecutionEvent(id, in, out, p.pInfo, nodeExecutionMetadata, execContext)
input.Info = p.pInfo
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
Expand Down Expand Up @@ -575,11 +574,20 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "plugin transition is not observed and no error as well.")
}

execID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID()
// STEP 4: Send buffered events!
logger.Debugf(ctx, "Sending buffered Task events.")
for _, ev := range tCtx.ber.GetAll(ctx) {
evInfo, err := ToTaskExecutionEvent(&execID, nCtx.InputReader(), tCtx.ow, ev, nCtx.NodeExecutionMetadata(), nCtx.ExecutionContext())
evInfo, err := ToTaskExecutionEvent(ToTaskExecutionEventInputs{
TaskExecContext: tCtx,
InputReader: nCtx.InputReader(),
OutputWriter: tCtx.ow,
Info: ev,
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
ExecContext: nCtx.ExecutionContext(),
TaskType: ttype,
PluginID: p.GetID(),
ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(),
})
if err != nil {
return handler.UnknownTransition, err
}
Expand All @@ -593,7 +601,16 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)

// STEP 5: Send Transition events
logger.Debugf(ctx, "Sending transition event for plugin phase [%s]", pluginTrns.pInfo.Phase().String())
evInfo, err := pluginTrns.FinalTaskEvent(&execID, nCtx.InputReader(), tCtx.ow, nCtx.NodeExecutionMetadata(), nCtx.ExecutionContext())
evInfo, err := pluginTrns.FinalTaskEvent(ToTaskExecutionEventInputs{
TaskExecContext: tCtx,
InputReader: nCtx.InputReader(),
OutputWriter: tCtx.ow,
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
ExecContext: nCtx.ExecutionContext(),
TaskType: ttype,
PluginID: p.GetID(),
ResourcePoolInfo: tCtx.rm.GetResourcePoolInfo(),
})
if err != nil {
logger.Errorf(ctx, "failed to convert plugin transition to TaskExecutionEvent. Error: %s", err.Error())
return handler.UnknownTransition, err
Expand Down
26 changes: 25 additions & 1 deletion pkg/controller/nodes/task/resourcemanager/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"sync"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down Expand Up @@ -72,6 +74,12 @@ type Proxy struct {
BaseResourceManager
ResourceNamespacePrefix pluginCore.ResourceNamespace
ExecutionIdentifier *core.TaskExecutionIdentifier
ResourcePoolInfo map[string]*event.ResourcePoolInfo
}

type TaskResourceManager interface {
pluginCore.ResourceManager
GetResourcePoolInfo() []*event.ResourcePoolInfo
}

func (p Proxy) ComposeResourceConstraint(spec pluginCore.ResourceConstraintsSpec) []FullyQualifiedResourceConstraint {
Expand All @@ -92,6 +100,13 @@ func (p Proxy) AllocateResource(ctx context.Context, namespace pluginCore.Resour
p.ResourceNamespacePrefix.CreateSubNamespace(namespace),
Token(allocationToken).prepend(ComposeTokenPrefix(p.ExecutionIdentifier)),
composedResourceConstraintList)
if err != nil {
return status, err
}
p.ResourcePoolInfo[allocationToken] = &event.ResourcePoolInfo{
AllocationToken: allocationToken,
Namespace: string(namespace),
}
return status, err
}

Expand All @@ -103,12 +118,21 @@ func (p Proxy) ReleaseResource(ctx context.Context, namespace pluginCore.Resourc
return err
}

func (p Proxy) GetResourcePoolInfo() []*event.ResourcePoolInfo {
response := make([]*event.ResourcePoolInfo, 0, len(p.ResourcePoolInfo))
for _, resourcePoolInfo := range p.ResourcePoolInfo {
response = append(response, resourcePoolInfo)
}
return response
}

func GetTaskResourceManager(r BaseResourceManager, resourceNamespacePrefix pluginCore.ResourceNamespace,
id *core.TaskExecutionIdentifier) pluginCore.ResourceManager {
id *core.TaskExecutionIdentifier) TaskResourceManager {
return Proxy{
BaseResourceManager: r,
ResourceNamespacePrefix: resourceNamespacePrefix,
ExecutionIdentifier: id,
ResourcePoolInfo: make(map[string]*event.ResourcePoolInfo),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package resourcemanager

import (
"context"
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
core2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
rmConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager/config"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

Expand Down Expand Up @@ -53,3 +60,18 @@ func TestToken_prepend(t *testing.T) {
})
}
}

func TestTaskResourceManager(t *testing.T) {
rmBuilder, _ := GetResourceManagerBuilderByType(context.TODO(), rmConfig.TypeNoop, promutils.NewTestScope())
rm, _ := rmBuilder.BuildResourceManager(context.TODO())
taskResourceManager := GetTaskResourceManager(rm, "namespace", &core.TaskExecutionIdentifier{})
_, err := taskResourceManager.AllocateResource(context.TODO(), "namespace", "allocation token", core2.ResourceConstraintsSpec{})
assert.NoError(t, err)
resourcePoolInfo := taskResourceManager.GetResourcePoolInfo()
assert.EqualValues(t, []*event.ResourcePoolInfo{
{
Namespace: "namespace",
AllocationToken: "allocation token",
},
}, resourcePoolInfo)
}
2 changes: 1 addition & 1 deletion pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (t taskExecutionMetadata) GetMaxAttempts() uint32 {
type taskExecutionContext struct {
handler.NodeExecutionContext
tm taskExecutionMetadata
rm pluginCore.ResourceManager
rm resourcemanager.TaskResourceManager
psm *pluginStateManager
tr handler.TaskReader
ow *ioutils.BufferedOutputWriter
Expand Down
18 changes: 17 additions & 1 deletion pkg/controller/nodes/task/taskexec_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"context"
"testing"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"

mocks2 "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -148,8 +153,19 @@ func TestHandler_newTaskExecutionContext(t *testing.T) {
assert.Equal(t, got.TaskExecutionMetadata().GetTaskExecutionID().GetID().NodeExecutionId.GetNodeId(), nodeID)
assert.Equal(t, got.TaskExecutionMetadata().GetTaskExecutionID().GetID().NodeExecutionId.GetExecutionId(), wfExecID)

assert.EqualValues(t, got.ResourceManager().(resourcemanager.TaskResourceManager).GetResourcePoolInfo(), make([]*event.ResourcePoolInfo, 0))

// TODO @kumare fix this test
assert.NotNil(t, got.ResourceManager())
assert.NotNil(t, got.rm)

_, err = got.rm.AllocateResource(context.TODO(), "foo", "token", pluginCore.ResourceConstraintsSpec{})
assert.NoError(t, err)
assert.EqualValues(t, []*event.ResourcePoolInfo{
{
Namespace: "foo",
AllocationToken: "token",
},
}, got.ResourceManager().(resourcemanager.TaskResourceManager).GetResourcePoolInfo())
assert.Nil(t, got.Catalog())
// assert.Equal(t, got.InputReader(), ir)
}
58 changes: 40 additions & 18 deletions pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,55 +69,77 @@ func getParentNodeExecIDForTask(taskExecID *core.TaskExecutionIdentifier, execCo
return nodeExecutionID, nil
}

func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo,
nodeExecutionMetadata handler.NodeExecutionMetadata, execContext executors.ExecutionContext) (*event.TaskExecutionEvent, error) {
type ToTaskExecutionEventInputs struct {
TaskExecContext pluginCore.TaskExecutionContext
InputReader io.InputFilePaths
OutputWriter io.OutputFilePaths
Info pluginCore.PhaseInfo
NodeExecutionMetadata handler.NodeExecutionMetadata
ExecContext executors.ExecutionContext
TaskType string
PluginID string
ResourcePoolInfo []*event.ResourcePoolInfo
}

func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) {
// Transitions to a new phase

tm := ptypes.TimestampNow()
var err error
if i := info.Info(); i != nil && i.OccurredAt != nil {
if i := input.Info.Info(); i != nil && i.OccurredAt != nil {
tm, err = ptypes.TimestampProto(*i.OccurredAt)
if err != nil {
return nil, err
}
}

nodeExecutionID, err := getParentNodeExecIDForTask(taskExecID, execContext)
taskExecID := input.TaskExecContext.TaskExecutionMetadata().GetTaskExecutionID().GetID()
nodeExecutionID, err := getParentNodeExecIDForTask(&taskExecID, input.ExecContext)
if err != nil {
return nil, err
}
metadata := input.Info.Info().Metadata
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
if metadata == nil {
metadata = &event.TaskExecutionMetadata{}
}
metadata.PluginIdentifier = input.PluginID
metadata.GeneratedName = input.TaskExecContext.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
metadata.ResourcePoolInfo = input.ResourcePoolInfo
tev := &event.TaskExecutionEvent{
TaskId: taskExecID.TaskId,
ParentNodeExecutionId: nodeExecutionID,
RetryAttempt: taskExecID.RetryAttempt,
Phase: ToTaskEventPhase(info.Phase()),
PhaseVersion: info.Version(),
Phase: ToTaskEventPhase(input.Info.Phase()),
PhaseVersion: input.Info.Version(),
ProducerId: "propeller",
OccurredAt: tm,
InputUri: in.GetInputPath().String(),
InputUri: input.InputReader.GetInputPath().String(),
TaskType: input.TaskType,
Reason: input.Info.Reason(),
Metadata: metadata,
}

if info.Phase().IsSuccess() && out != nil {
if out.GetOutputPath() != "" {
tev.OutputResult = &event.TaskExecutionEvent_OutputUri{OutputUri: out.GetOutputPath().String()}
if input.Info.Phase().IsSuccess() && input.OutputWriter != nil {
if input.OutputWriter.GetOutputPath() != "" {
tev.OutputResult = &event.TaskExecutionEvent_OutputUri{OutputUri: input.OutputWriter.GetOutputPath().String()}
}
}

if info.Phase().IsFailure() && info.Err() != nil {
if input.Info.Phase().IsFailure() && input.Info.Err() != nil {
tev.OutputResult = &event.TaskExecutionEvent_Error{
Error: info.Err(),
Error: input.Info.Err(),
}
}

if info.Info() != nil {
tev.Logs = info.Info().Logs
tev.CustomInfo = info.Info().CustomInfo
if input.Info.Info() != nil {
tev.Logs = input.Info.Info().Logs
tev.CustomInfo = input.Info.Info().CustomInfo
}

if nodeExecutionMetadata.IsInterruptible() {
tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_INTERRUPTIBLE}
if input.NodeExecutionMetadata.IsInterruptible() {
tev.Metadata.InstanceClass = event.TaskExecutionMetadata_INTERRUPTIBLE
} else {
tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_DEFAULT}
tev.Metadata.InstanceClass = event.TaskExecutionMetadata_DEFAULT
}

return tev, nil
Expand Down
Loading