Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add TaskExecutionMetadata to TaskExecutionEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan committed Sep 23, 2020
1 parent e49292e commit 0e56642
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.18.6
github.com/lyft/flyteidl v0.18.7-0.20200923210508-5e52ea4ac960
github.com/lyft/flyteplugins v0.5.1
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,9 @@ github.com/lyft/datacatalog v0.2.1 h1:W7LsAjaS297iLCtSH9ZaAAG3YPofwkbbgIaqkfdeM0
github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4=
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio=
github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.6 h1:HGbxHI8avEDvoPqcO2+/BoJVcP9sjOj4qwJ/wNRWuoA=
github.com/lyft/flyteidl v0.18.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.7-0.20200923210508-5e52ea4ac960 h1:1A5aOwZcnrMJyYE3ilUbmUcrmcAEEQ/n1m+iPO41Jws=
github.com/lyft/flyteidl v0.18.7-0.20200923210508-5e52ea4ac960/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.7-0.20200904001213-41861229003a h1:NcURTOidN/PUOMRSKvaKMJmSdGcdLJHPwZFmpSX+KVk=
github.com/lyft/flyteplugins v0.4.7-0.20200904001213-41861229003a/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flyteplugins v0.4.8 h1:fGihSTfOw1LEGuwh7QfRp3l2dT28DfGFAu2d0axr8Q4=
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ func (p *pluginRequestedTransition) TransitionPreviouslyRecorded() {
p.previouslyObserved = true
}

func (p *pluginRequestedTransition) FinalTaskEvent(id *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths) (*event.TaskExecutionEvent, error) {
func (p *pluginRequestedTransition) FinalTaskEvent(id *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths,
nodeExecutionMetadata handler.NodeExecutionMetadata) (*event.TaskExecutionEvent, error) {
if p.previouslyObserved {
return nil, nil
}

return ToTaskExecutionEvent(id, in, out, p.pInfo)
return ToTaskExecutionEvent(id, in, out, p.pInfo, nodeExecutionMetadata)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
Expand Down Expand Up @@ -542,7 +543,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
// STEP 4: Send buffered events!
logger.Debugf(ctx, "Sending buffered Task events.")
for _, ev := range tCtx.ber.GetAll(ctx) {
evInfo, err := ToTaskExecutionEvent(&execID, nCtx.InputReader(), tCtx.ow, ev)
evInfo, err := ToTaskExecutionEvent(&execID, nCtx.InputReader(), tCtx.ow, ev, nCtx.NodeExecutionMetadata())
if err != nil {
return handler.UnknownTransition, err
}
Expand All @@ -556,7 +557,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)

// STEP 5: Send Transition events
logger.Debugf(ctx, "Sending transition event for plugin phase [%s]", pluginTrns.pInfo.Phase().String())
evInfo, err := pluginTrns.FinalTaskEvent(&execID, nCtx.InputReader(), tCtx.ow)
evInfo, err := pluginTrns.FinalTaskEvent(&execID, nCtx.InputReader(), tCtx.ow, nCtx.NodeExecutionMetadata())
if err != nil {
logger.Errorf(ctx, "failed to convert plugin transition to TaskExecutionEvent. Error: %s", err.Error())
return handler.UnknownTransition, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) {
Kind: "sample",
Name: "name",
})
nm.OnIsInterruptible().Return(false)

tk := &core.TaskTemplate{
Id: &core.Identifier{ResourceType: core.ResourceType_TASK, Project: "proj", Domain: "dom", Version: "ver"},
Expand Down Expand Up @@ -680,6 +681,7 @@ func Test_task_Handle_Catalog(t *testing.T) {
Kind: "sample",
Name: "name",
})
nm.OnIsInterruptible().Return(true)

taskID := &core.Identifier{}
tk := &core.TaskTemplate{
Expand Down Expand Up @@ -902,6 +904,7 @@ func Test_task_Handle_Barrier(t *testing.T) {
Kind: "sample",
Name: "name",
})
nm.OnIsInterruptible().Return(true)

taskID := &core.Identifier{}
tk := &core.TaskTemplate{
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func trimErrorMessage(original string, maxLength int) string {
return original[0:maxLength/2] + original[len(original)-maxLength/2:]
}

func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo) (*event.TaskExecutionEvent, error) {
func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo,
nodeExecutionMetadata handler.NodeExecutionMetadata) (*event.TaskExecutionEvent, error) {
// Transitions to a new phase

tm := ptypes.TimestampNow()
Expand Down Expand Up @@ -90,6 +91,12 @@ func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputF
tev.CustomInfo = info.Info().CustomInfo
}

if nodeExecutionMetadata.IsInterruptible() {
tev.Metadata = &event.TaskExecutionMetadata{InstanceType: event.TaskExecutionMetadata_INTERRUPTIBLE}
} else {
tev.Metadata = &event.TaskExecutionMetadata{InstanceType: event.TaskExecutionMetadata_DEFAULT}
}

return tev, nil
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/controller/nodes/task/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"

"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
handlerMocks "github.com/lyft/flytepropeller/pkg/controller/nodes/handler/mocks"
)

func TestToTaskEventPhase(t *testing.T) {
Expand Down Expand Up @@ -68,7 +71,10 @@ func TestToTaskExecutionEvent(t *testing.T) {
const outputPath = "out"
out.On("GetOutputPath").Return(storage.DataReference(outputPath))

tev, err := ToTaskExecutionEvent(id, in, out, pluginCore.PhaseInfoWaitingForResources(n, 0, "reason"))
nodeExecutionMetadata := handlerMocks.NodeExecutionMetadata{}
nodeExecutionMetadata.OnIsInterruptible().Return(true)
tev, err := ToTaskExecutionEvent(id, in, out, pluginCore.PhaseInfoWaitingForResources(n, 0, "reason"),
&nodeExecutionMetadata)
assert.NoError(t, err)
assert.Nil(t, tev.Logs)
assert.Equal(t, core.TaskExecution_WAITING_FOR_RESOURCES, tev.Phase)
Expand All @@ -78,6 +84,7 @@ func TestToTaskExecutionEvent(t *testing.T) {
assert.Equal(t, nodeID, tev.ParentNodeExecutionId)
assert.Equal(t, inputPath, tev.InputUri)
assert.Nil(t, tev.OutputResult)
assert.Equal(t, event.TaskExecutionMetadata_INTERRUPTIBLE, tev.Metadata.InstanceType)

l := []*core.TaskLog{
{Uri: "x", Name: "y", MessageFormat: core.TaskLog_JSON},
Expand All @@ -87,7 +94,7 @@ func TestToTaskExecutionEvent(t *testing.T) {
OccurredAt: &n,
Logs: l,
CustomInfo: c,
}))
}), &nodeExecutionMetadata)
assert.NoError(t, err)
assert.Equal(t, core.TaskExecution_RUNNING, tev.Phase)
assert.Equal(t, uint32(1), tev.PhaseVersion)
Expand All @@ -98,12 +105,15 @@ func TestToTaskExecutionEvent(t *testing.T) {
assert.Equal(t, nodeID, tev.ParentNodeExecutionId)
assert.Equal(t, inputPath, tev.InputUri)
assert.Nil(t, tev.OutputResult)
assert.Equal(t, event.TaskExecutionMetadata_INTERRUPTIBLE, tev.Metadata.InstanceType)

defaultNodeExecutionMetadata := handlerMocks.NodeExecutionMetadata{}
defaultNodeExecutionMetadata.OnIsInterruptible().Return(false)
tev, err = ToTaskExecutionEvent(id, in, out, pluginCore.PhaseInfoSuccess(&pluginCore.TaskInfo{
OccurredAt: &n,
Logs: l,
CustomInfo: c,
}))
}), &defaultNodeExecutionMetadata)
assert.NoError(t, err)
assert.Equal(t, core.TaskExecution_SUCCEEDED, tev.Phase)
assert.Equal(t, uint32(0), tev.PhaseVersion)
Expand All @@ -116,6 +126,7 @@ func TestToTaskExecutionEvent(t *testing.T) {
assert.NotNil(t, tev.OutputResult)
assert.Equal(t, inputPath, tev.InputUri)
assert.Equal(t, outputPath, tev.GetOutputUri())
assert.Empty(t, event.TaskExecutionMetadata_DEFAULT, tev.Metadata.InstanceType)
}

func TestToTransitionType(t *testing.T) {
Expand Down

0 comments on commit 0e56642

Please sign in to comment.