Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial WIP on CustomInfo for agents #5391

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
UpdatedAt: reportedAt,
CreatedAt: input.Request.Event.OccurredAt,
Logs: input.Request.Event.Logs,
// TODO: does this help us pass CustomInfo at all?
CustomInfo: input.Request.Event.CustomInfo,
TaskType: input.Request.Event.TaskType,
Metadata: metadata,
Expand Down Expand Up @@ -288,6 +289,8 @@ func mergeCustom(existing, latest *_struct.Struct) (*_struct.Struct, error) {
return &response, nil
}

// TODO: this merging seems to accumulate logs -- should it also accumulate CustomInfo?

// mergeExternalResource combines the latest ExternalResourceInfo proto with an existing instance
// by updating fields and merging logs.
func mergeExternalResource(existing, latest *event.ExternalResourceInfo) *event.ExternalResourceInfo {
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type ExternalResource struct {
RetryAttempt uint32
// Phase (if exists) associated with the external resource
Phase Phase
// TODO: should there also be a CustomInfo here?
}

type ReasonInfo struct {
Expand Down
6 changes: 4 additions & 2 deletions flyteplugins/go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl
t := time.Now()

nowTaskInfo := &core.TaskInfo{
OccurredAt: &t,
Logs: logLinks,
OccurredAt: &t,
Logs: logLinks,
// TODO: is this missing CustomInfo ? Should it be copied from externalResources? Or passed as a param?
// CustomInfo: &structpb.Struct{Fields: map[string]*structpb.Value{}},
ExternalResources: externalResources,
}

Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
logLinks = phaseInfo.Info().Logs
}

// TODO: it looks like this is where .CustomInfo should be used

externalResources = append(externalResources, &core.ExternalResource{
ExternalID: podName,
Index: uint32(originalIdx),
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func TestCheckSubTasksState(t *testing.T) {
resourceManager.AssertNumberOfCalls(t, "AllocateResource", 0)
resourceManager.AssertNumberOfCalls(t, "ReleaseResource", 0)

// TODO: write tests for CustomInfo here?
assert.Equal(t, subtaskCount, len(externalResources))
for i := 0; i < subtaskCount; i++ {
externalResource := externalResources[i]
Expand Down
27 changes: 17 additions & 10 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
structpb "github.com/golang/protobuf/ptypes/struct"
)

type Registry map[string]map[int32]*Agent // map[taskTypeName][taskTypeVersion] => Agent
Expand All @@ -38,6 +39,8 @@ type ResourceWrapper struct {
Outputs *flyteIdl.LiteralMap
Message string
LogLinks []*flyteIdl.TaskLog
// TODO: should there also be a CustomInfo here?
CustomInfo *structpb.Struct
}

// IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state.
Expand Down Expand Up @@ -184,10 +187,11 @@ func (p Plugin) ExecuteTaskSync(
}

return nil, ResourceWrapper{
Phase: resource.Phase,
Outputs: resource.Outputs,
Message: resource.Message,
LogLinks: resource.LogLinks,
Phase: resource.Phase,
Outputs: resource.Outputs,
Message: resource.Message,
LogLinks: resource.LogLinks,
CustomInfo: resource.CustomInfo,
}, err
}

Expand All @@ -213,11 +217,12 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba
}

return ResourceWrapper{
Phase: res.Resource.Phase,
State: res.Resource.State,
Outputs: res.Resource.Outputs,
Message: res.Resource.Message,
LogLinks: res.Resource.LogLinks,
Phase: res.Resource.Phase,
State: res.Resource.State,
Outputs: res.Resource.Outputs,
Message: res.Resource.Message,
LogLinks: res.Resource.LogLinks,
CustomInfo: res.Resource.CustomInfo,
}, nil
}

Expand Down Expand Up @@ -246,7 +251,9 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error

func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
resource := taskCtx.Resource().(ResourceWrapper)
taskInfo := &core.TaskInfo{Logs: resource.LogLinks}
// taskInfo := &core.TaskInfo{Logs: resource.LogLinks}
// TODO: should this actually be something like this?
taskInfo := &core.TaskInfo{Logs: resource.LogLinks, CustomInfo: resource.CustomInfo}

switch resource.Phase {
case flyteIdl.TaskExecution_QUEUED:
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@

if input.Info.Info() != nil {
tev.Logs = input.Info.Info().Logs
// TODO: this is one place whre CustomInfo is actually used

Check failure on line 181 in flytepropeller/pkg/controller/nodes/task/transformer.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

whre ==> where
tev.CustomInfo = input.Info.Info().CustomInfo
}

Expand Down
Loading