Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Jun 6, 2024
1 parent cbd751d commit d608f4b
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 0 deletions.
3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type TaskExecutionContext interface {

// Returns a handle to the Task events recorder, which get stored in the Admin.
EventsRecorder() EventsRecorder

// TODO: Is this right? Where else can I plop this in?
// ConsoleURL() string
}

// A simple fire-and-forget func
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type TaskExecutionID interface {
// GetUniqueNodeID returns the fully-qualified Node ID that is unique within a
// given workflow execution.
GetUniqueNodeID() string

GetConsoleURL() string
}

// TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flytek8s

import (
"context"
"fmt"
"os"
"strconv"

Expand Down Expand Up @@ -69,6 +70,15 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
// },
}

if id.GetConsoleURL() != "" {
envVars = append(envVars, v1.EnvVar{
Name: "FLYTE_EXECUTION_URL",
// TODO: is it safe to append `attempt` like this?
// TODO: should we use net/url to build this url?
Value: fmt.Sprintf("%s/projects/%s/domains/%s/executions/%s/nodeId/%s-%s/nodes", id.GetConsoleURL(), nodeExecutionID.Project, nodeExecutionID.Domain, nodeExecutionID.Name, id.GetUniqueNodeID(), attemptNumber),
})
}

// Task definition Level env variables.
if id.GetID().TaskId != nil {
taskID := id.GetID().TaskId
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/cmd/kubectl-flyte/cmd/printers/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (w *ContextualWorkflow) GetExecutionConfig() v1alpha1.ExecutionConfig {
return v1alpha1.ExecutionConfig{}
}

func (w *ContextualWorkflow) GetConsoleURL() string { return "" }

type WorkflowPrinter struct {
}

Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ type Meta interface {
GetEventVersion() EventVersion
GetDefinitionVersion() WorkflowDefinitionVersion
GetRawOutputDataConfig() RawOutputDataConfig
GetConsoleURL() string
}

type TaskDetailsGetter interface {
Expand All @@ -534,6 +535,7 @@ type ExecutableWorkflow interface {
MetaExtended
NodeStatusGetter
GetExecutionConfig() ExecutionConfig
GetConsoleURL() string
}

type NodeStatusGetter interface {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ImmutableExecutionContext interface {
GetID() v1alpha1.WorkflowID
GetOnFailurePolicy() v1alpha1.WorkflowOnFailurePolicy
GetExecutionConfig() v1alpha1.ExecutionConfig
GetConsoleURL() string
}

type ParentInfoGetter interface {
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ type nodeExecutor struct {
shardSelector ioutils.ShardSelector
store *storage.DataStore
taskRecorder events.TaskEventRecorder
consoleURL string
}

func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type NodeExecutionMetadata interface {
GetSecurityContext() core.SecurityContext
IsInterruptible() bool
GetInterruptibleFailureThreshold() int32
GetConsoleURL() string
}

type NodeExecutionContext interface {
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (e nodeExecMetadata) GetLabels() map[string]string {
return e.nodeLabels
}

func (e nodeExecMetadata) GetConsoleURL() string { return e.Meta.GetConsoleURL() }

type nodeExecContext struct {
store *storage.DataStore
tr interfaces.TaskReader
Expand Down
6 changes: 6 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type taskExecutionID struct {
execName string
id *core.TaskExecutionIdentifier
uniqueNodeID string
consoleURL string
}

func (te taskExecutionID) GetID() core.TaskExecutionIdentifier {
Expand Down Expand Up @@ -63,6 +64,8 @@ func (te taskExecutionID) GetGeneratedNameWith(minLength, maxLength int) (string
return te.execName, nil
}

func (te taskExecutionID) GetConsoleURL() string { return te.consoleURL }

type taskExecutionMetadata struct {
interfaces.NodeExecutionMetadata
taskExecID taskExecutionID
Expand Down Expand Up @@ -290,6 +293,8 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N
return nil, err
}

nCtx.Node().GetTaskID()

return &taskExecutionContext{
NodeExecutionContext: nCtx,
tm: taskExecutionMetadata{
Expand All @@ -298,6 +303,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N
execName: uniqueID,
id: id,
uniqueNodeID: currentNodeUniqueID,
consoleURL: nCtx.NodeExecutionMetadata().GetConsoleURL(),
},
o: nCtx.Node(),
maxAttempts: maxAttempts,
Expand Down

0 comments on commit d608f4b

Please sign in to comment.