Skip to content

Commit

Permalink
more wip
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Jun 6, 2024
1 parent d608f4b commit c3fc152
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 55 deletions.
3 changes: 0 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ type TaskExecutionContext interface {

// Returns a handle to the Task events recorder, which get stored in the Admin.
EventsRecorder() EventsRecorder

// TODO: Is this right? Where else can I plop this in?
// ConsoleURL() string
}

// A simple fire-and-forget func
Expand Down
3 changes: 1 addition & 2 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type TaskExecutionID interface {
// GetUniqueNodeID returns the fully-qualified Node ID that is unique within a
// given workflow execution.
GetUniqueNodeID() string

GetConsoleURL() string
}

// TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the
Expand All @@ -55,4 +53,5 @@ type TaskExecutionMetadata interface {
GetPlatformResources() *v1.ResourceRequirements
GetInterruptibleFailureThreshold() int32
GetEnvironmentVariables() map[string]string
GetConsoleURL() string
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.
}
container.Args = modifiedArgs

container.Env, container.EnvFrom = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID())
container.Env, container.EnvFrom = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID(), parameters.TaskExecMetadata.GetConsoleURL())

// retrieve platformResources and overrideResources to use when aggregating container resources
platformResources := parameters.TaskExecMetadata.GetPlatformResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar {
return envVars
}

func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1.EnvVar {

if id == nil || id.GetID().NodeExecutionId == nil || id.GetID().NodeExecutionId.ExecutionId == nil {
return []v1.EnvVar{}
Expand Down Expand Up @@ -70,12 +70,12 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
// },
}

if id.GetConsoleURL() != "" {
if consoleURL != "" {
envVars = append(envVars, v1.EnvVar{
Name: "FLYTE_EXECUTION_URL",
// TODO: is it safe to append `attempt` like this?
// TODO: should we use net/url to build this url?
Value: fmt.Sprintf("%s/projects/%s/domains/%s/executions/%s/nodeId/%s-%s/nodes", id.GetConsoleURL(), nodeExecutionID.Project, nodeExecutionID.Domain, nodeExecutionID.Name, id.GetUniqueNodeID(), attemptNumber),
Value: fmt.Sprintf("%s/projects/%s/domains/%s/executions/%s/nodeId/%s-%s/nodes", consoleURL, nodeExecutionID.Project, nodeExecutionID.Domain, nodeExecutionID.Name, id.GetUniqueNodeID(), attemptNumber),
})
}

Expand Down Expand Up @@ -123,9 +123,9 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
return envVars
}

func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID) ([]v1.EnvVar, []v1.EnvFromSource) {
func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource) {
envVars = append(envVars, GetContextEnvVars(ctx)...)
envVars = append(envVars, GetExecutionEnvVars(id)...)
envVars = append(envVars, GetExecutionEnvVars(id, consoleURL)...)

for k, v := range taskEnvironmentVariables {
envVars = append(envVars, v1.EnvVar{Name: k, Value: v})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInpu

func getEnvVarsForTask(ctx context.Context, execID pluginCore.TaskExecutionID, containerEnvVars []*core.KeyValuePair,
defaultEnvVars map[string]string) []v1.EnvVar {
envVars, _ := flytek8s.DecorateEnvVars(ctx, flytek8s.ToK8sEnvVar(containerEnvVars), nil, execID)
envVars, _ := flytek8s.DecorateEnvVars(ctx, flytek8s.ToK8sEnvVar(containerEnvVars), nil, execID, "")
m := make(map[string]string, len(envVars))
for _, envVar := range envVars {
m[envVar.Name] = envVar.Value
Expand Down
1 change: 0 additions & 1 deletion flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ type ExecutableWorkflow interface {
MetaExtended
NodeStatusGetter
GetExecutionConfig() ExecutionConfig
GetConsoleURL() string
}

type NodeStatusGetter interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type ImmutableExecutionContext interface {
GetID() v1alpha1.WorkflowID
GetOnFailurePolicy() v1alpha1.WorkflowOnFailurePolicy
GetExecutionConfig() v1alpha1.ExecutionConfig
GetConsoleURL() string
}

type ParentInfoGetter interface {
Expand Down
1 change: 0 additions & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ type nodeExecutor struct {
shardSelector ioutils.ShardSelector
store *storage.DataStore
taskRecorder events.TaskEventRecorder
consoleURL string
}

func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) {
Expand Down
2 changes: 0 additions & 2 deletions flytepropeller/pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ func (e nodeExecMetadata) GetLabels() map[string]string {
return e.nodeLabels
}

func (e nodeExecMetadata) GetConsoleURL() string { return e.Meta.GetConsoleURL() }

type nodeExecContext struct {
store *storage.DataStore
tr interfaces.TaskReader
Expand Down
6 changes: 0 additions & 6 deletions flytepropeller/pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type taskExecutionID struct {
execName string
id *core.TaskExecutionIdentifier
uniqueNodeID string
consoleURL string
}

func (te taskExecutionID) GetID() core.TaskExecutionIdentifier {
Expand Down Expand Up @@ -64,8 +63,6 @@ func (te taskExecutionID) GetGeneratedNameWith(minLength, maxLength int) (string
return te.execName, nil
}

func (te taskExecutionID) GetConsoleURL() string { return te.consoleURL }

type taskExecutionMetadata struct {
interfaces.NodeExecutionMetadata
taskExecID taskExecutionID
Expand Down Expand Up @@ -293,8 +290,6 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N
return nil, err
}

nCtx.Node().GetTaskID()

return &taskExecutionContext{
NodeExecutionContext: nCtx,
tm: taskExecutionMetadata{
Expand All @@ -303,7 +298,6 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N
execName: uniqueID,
id: id,
uniqueNodeID: currentNodeUniqueID,
consoleURL: nCtx.NodeExecutionMetadata().GetConsoleURL(),
},
o: nCtx.Node(),
maxAttempts: maxAttempts,
Expand Down

0 comments on commit c3fc152

Please sign in to comment.