diff --git a/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go b/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go index a2f20bcee5..f43c98c781 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go @@ -63,6 +63,9 @@ type TaskExecutionContext interface { // Returns a handle to the Task events recorder, which get stored in the Admin. EventsRecorder() EventsRecorder + + // TODO: Is this right? Where else can I plop this in? + // ConsoleURL() string } // A simple fire-and-forget func diff --git a/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go b/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go index 83d7dbcf12..b3c3986a4b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go @@ -32,6 +32,8 @@ type TaskExecutionID interface { // GetUniqueNodeID returns the fully-qualified Node ID that is unique within a // given workflow execution. GetUniqueNodeID() string + + GetConsoleURL() string } // TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go index 44596bf82f..d17244ca2b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go @@ -12,6 +12,38 @@ type TaskExecutionID struct { mock.Mock } +type TaskExecutionID_GetConsoleURL struct { + *mock.Call +} + +func (_m TaskExecutionID_GetConsoleURL) Return(_a0 string) *TaskExecutionID_GetConsoleURL { + return &TaskExecutionID_GetConsoleURL{Call: _m.Call.Return(_a0)} +} + +func (_m *TaskExecutionID) OnGetConsoleURL() *TaskExecutionID_GetConsoleURL { + c_call := _m.On("GetConsoleURL") + return &TaskExecutionID_GetConsoleURL{Call: c_call} +} + +func (_m *TaskExecutionID) OnGetConsoleURLMatch(matchers ...interface{}) *TaskExecutionID_GetConsoleURL { + c_call := _m.On("GetConsoleURL", matchers...) + return &TaskExecutionID_GetConsoleURL{Call: c_call} +} + +// GetConsoleURL provides a mock function with given fields: +func (_m *TaskExecutionID) GetConsoleURL() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + type TaskExecutionID_GetGeneratedName struct { *mock.Call } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go index b235725edd..6ce29302a9 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go @@ -2,6 +2,7 @@ package flytek8s import ( "context" + "fmt" "os" "strconv" @@ -69,6 +70,15 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar { // }, } + if id.GetConsoleURL() != "" { + envVars = append(envVars, v1.EnvVar{ + Name: "FLYTE_EXECUTION_URL", + // TODO: is it safe to append `attempt` like this? + // TODO: should we use net/url to build this url? + Value: fmt.Sprintf("%s/projects/%s/domains/%s/executions/%s/nodeId/%s-%s/nodes", id.GetConsoleURL(), nodeExecutionID.Project, nodeExecutionID.Domain, nodeExecutionID.Name, id.GetUniqueNodeID(), attemptNumber), + }) + } + // Task definition Level env variables. if id.GetID().TaskId != nil { taskID := id.GetID().TaskId diff --git a/flytepropeller/cmd/kubectl-flyte/cmd/printers/workflow.go b/flytepropeller/cmd/kubectl-flyte/cmd/printers/workflow.go index ff881c938e..24d750661e 100644 --- a/flytepropeller/cmd/kubectl-flyte/cmd/printers/workflow.go +++ b/flytepropeller/cmd/kubectl-flyte/cmd/printers/workflow.go @@ -47,6 +47,8 @@ func (w *ContextualWorkflow) GetExecutionConfig() v1alpha1.ExecutionConfig { return v1alpha1.ExecutionConfig{} } +func (w *ContextualWorkflow) GetConsoleURL() string { return "" } + type WorkflowPrinter struct { } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index f92cca4a5a..5ace6b37ce 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -511,6 +511,7 @@ type Meta interface { GetEventVersion() EventVersion GetDefinitionVersion() WorkflowDefinitionVersion GetRawOutputDataConfig() RawOutputDataConfig + GetConsoleURL() string } type TaskDetailsGetter interface { @@ -534,6 +535,7 @@ type ExecutableWorkflow interface { MetaExtended NodeStatusGetter GetExecutionConfig() ExecutionConfig + GetConsoleURL() string } type NodeStatusGetter interface { diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go index b417e2f892..093a93ac2d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go @@ -163,6 +163,38 @@ func (_m *ExecutableWorkflow) GetConnections() *v1alpha1.Connections { return r0 } +type ExecutableWorkflow_GetConsoleURL struct { + *mock.Call +} + +func (_m ExecutableWorkflow_GetConsoleURL) Return(_a0 string) *ExecutableWorkflow_GetConsoleURL { + return &ExecutableWorkflow_GetConsoleURL{Call: _m.Call.Return(_a0)} +} + +func (_m *ExecutableWorkflow) OnGetConsoleURL() *ExecutableWorkflow_GetConsoleURL { + c_call := _m.On("GetConsoleURL") + return &ExecutableWorkflow_GetConsoleURL{Call: c_call} +} + +func (_m *ExecutableWorkflow) OnGetConsoleURLMatch(matchers ...interface{}) *ExecutableWorkflow_GetConsoleURL { + c_call := _m.On("GetConsoleURL", matchers...) + return &ExecutableWorkflow_GetConsoleURL{Call: c_call} +} + +// GetConsoleURL provides a mock function with given fields: +func (_m *ExecutableWorkflow) GetConsoleURL() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + type ExecutableWorkflow_GetCreationTimestamp struct { *mock.Call } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go index e99227b099..4e098aab2f 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go @@ -52,6 +52,38 @@ func (_m *Meta) GetAnnotations() map[string]string { return r0 } +type Meta_GetConsoleURL struct { + *mock.Call +} + +func (_m Meta_GetConsoleURL) Return(_a0 string) *Meta_GetConsoleURL { + return &Meta_GetConsoleURL{Call: _m.Call.Return(_a0)} +} + +func (_m *Meta) OnGetConsoleURL() *Meta_GetConsoleURL { + c_call := _m.On("GetConsoleURL") + return &Meta_GetConsoleURL{Call: c_call} +} + +func (_m *Meta) OnGetConsoleURLMatch(matchers ...interface{}) *Meta_GetConsoleURL { + c_call := _m.On("GetConsoleURL", matchers...) + return &Meta_GetConsoleURL{Call: c_call} +} + +// GetConsoleURL provides a mock function with given fields: +func (_m *Meta) GetConsoleURL() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + type Meta_GetCreationTimestamp struct { *mock.Call } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go index 50e478b6d4..95915088da 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go @@ -86,6 +86,38 @@ func (_m *MetaExtended) GetAnnotations() map[string]string { return r0 } +type MetaExtended_GetConsoleURL struct { + *mock.Call +} + +func (_m MetaExtended_GetConsoleURL) Return(_a0 string) *MetaExtended_GetConsoleURL { + return &MetaExtended_GetConsoleURL{Call: _m.Call.Return(_a0)} +} + +func (_m *MetaExtended) OnGetConsoleURL() *MetaExtended_GetConsoleURL { + c_call := _m.On("GetConsoleURL") + return &MetaExtended_GetConsoleURL{Call: c_call} +} + +func (_m *MetaExtended) OnGetConsoleURLMatch(matchers ...interface{}) *MetaExtended_GetConsoleURL { + c_call := _m.On("GetConsoleURL", matchers...) + return &MetaExtended_GetConsoleURL{Call: c_call} +} + +// GetConsoleURL provides a mock function with given fields: +func (_m *MetaExtended) GetConsoleURL() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + type MetaExtended_GetCreationTimestamp struct { *mock.Call } diff --git a/flytepropeller/pkg/controller/executors/execution_context.go b/flytepropeller/pkg/controller/executors/execution_context.go index 84799a0400..229201df6f 100644 --- a/flytepropeller/pkg/controller/executors/execution_context.go +++ b/flytepropeller/pkg/controller/executors/execution_context.go @@ -19,6 +19,7 @@ type ImmutableExecutionContext interface { GetID() v1alpha1.WorkflowID GetOnFailurePolicy() v1alpha1.WorkflowOnFailurePolicy GetExecutionConfig() v1alpha1.ExecutionConfig + GetConsoleURL() string } type ParentInfoGetter interface { diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 1c42357623..d8d8c5e8ca 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -500,6 +500,7 @@ type nodeExecutor struct { shardSelector ioutils.ShardSelector store *storage.DataStore taskRecorder events.TaskEventRecorder + consoleURL string } func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) { diff --git a/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go b/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go index c78bdfb557..26b41cc94b 100644 --- a/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go +++ b/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_metadata.go @@ -51,6 +51,38 @@ func (_m *NodeExecutionMetadata) GetAnnotations() map[string]string { return r0 } +type NodeExecutionMetadata_GetConsoleURL struct { + *mock.Call +} + +func (_m NodeExecutionMetadata_GetConsoleURL) Return(_a0 string) *NodeExecutionMetadata_GetConsoleURL { + return &NodeExecutionMetadata_GetConsoleURL{Call: _m.Call.Return(_a0)} +} + +func (_m *NodeExecutionMetadata) OnGetConsoleURL() *NodeExecutionMetadata_GetConsoleURL { + c_call := _m.On("GetConsoleURL") + return &NodeExecutionMetadata_GetConsoleURL{Call: c_call} +} + +func (_m *NodeExecutionMetadata) OnGetConsoleURLMatch(matchers ...interface{}) *NodeExecutionMetadata_GetConsoleURL { + c_call := _m.On("GetConsoleURL", matchers...) + return &NodeExecutionMetadata_GetConsoleURL{Call: c_call} +} + +// GetConsoleURL provides a mock function with given fields: +func (_m *NodeExecutionMetadata) GetConsoleURL() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + type NodeExecutionMetadata_GetInterruptibleFailureThreshold struct { *mock.Call } diff --git a/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go b/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go index b6a33a4e35..1ea60f89b0 100644 --- a/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go @@ -39,6 +39,7 @@ type NodeExecutionMetadata interface { GetSecurityContext() core.SecurityContext IsInterruptible() bool GetInterruptibleFailureThreshold() int32 + GetConsoleURL() string } type NodeExecutionContext interface { diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index a579b241f3..d5a04dbfe6 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -118,6 +118,8 @@ func (e nodeExecMetadata) GetLabels() map[string]string { return e.nodeLabels } +func (e nodeExecMetadata) GetConsoleURL() string { return e.Meta.GetConsoleURL() } + type nodeExecContext struct { store *storage.DataStore tr interfaces.TaskReader diff --git a/flytepropeller/pkg/controller/nodes/task/taskexec_context.go b/flytepropeller/pkg/controller/nodes/task/taskexec_context.go index 25b936a8e4..d0810b8a29 100644 --- a/flytepropeller/pkg/controller/nodes/task/taskexec_context.go +++ b/flytepropeller/pkg/controller/nodes/task/taskexec_context.go @@ -36,6 +36,7 @@ type taskExecutionID struct { execName string id *core.TaskExecutionIdentifier uniqueNodeID string + consoleURL string } func (te taskExecutionID) GetID() core.TaskExecutionIdentifier { @@ -63,6 +64,8 @@ func (te taskExecutionID) GetGeneratedNameWith(minLength, maxLength int) (string return te.execName, nil } +func (te taskExecutionID) GetConsoleURL() string { return te.consoleURL } + type taskExecutionMetadata struct { interfaces.NodeExecutionMetadata taskExecID taskExecutionID @@ -290,6 +293,8 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N return nil, err } + nCtx.Node().GetTaskID() + return &taskExecutionContext{ NodeExecutionContext: nCtx, tm: taskExecutionMetadata{ @@ -298,6 +303,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx interfaces.N execName: uniqueID, id: id, uniqueNodeID: currentNodeUniqueID, + consoleURL: nCtx.NodeExecutionMetadata().GetConsoleURL(), }, o: nCtx.Node(), maxAttempts: maxAttempts,