Skip to content

Commit

Permalink
Passthrough unique node ID in task execution ID for generating log te…
Browse files Browse the repository at this point in the history
…mplate vars

Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb committed Nov 8, 2023
1 parent bec7bbb commit ac764a9
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 221 deletions.
5 changes: 3 additions & 2 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flytestdlib/logger"
)
Expand All @@ -18,7 +19,7 @@ type logPlugin struct {
}

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID *core.TaskExecutionIdentifier, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand Down Expand Up @@ -53,7 +54,7 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionIdentifier: taskExecID,
TaskExecutionID: taskExecID,
ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
},
)
Expand Down
59 changes: 33 additions & 26 deletions flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,41 @@ import (
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
coreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
)

const podName = "PodName"

var dummyTaskExecID = &core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: "my-task-name",
Project: "my-task-project",
Domain: "my-task-domain",
Version: "1",
},
NodeExecutionId: &core.NodeExecutionIdentifier{
NodeId: "n0",
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "my-execution-name",
Project: "my-execution-project",
Domain: "my-execution-domain",
func dummyTaskExecID() pluginCore.TaskExecutionID {
tID := &coreMocks.TaskExecutionID{}
tID.OnGetGeneratedName().Return("generated-name")
tID.OnGetID().Return(core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: "my-task-name",
Project: "my-task-project",
Domain: "my-task-domain",
Version: "1",
},
},
RetryAttempt: 1,
NodeExecutionId: &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "my-execution-name",
Project: "my-execution-project",
Domain: "my-execution-domain",
},
},
RetryAttempt: 1,
})
tID.OnGetUniqueNodeID().Return("n0-0-n0")
return tID
}

func TestGetLogsForContainerInPod_NoPlugins(t *testing.T) {
logPlugin, err := InitializeLogPlugins(&LogConfig{})
assert.NoError(t, err)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, nil, 0, " Suffix", nil)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, l)
}
Expand All @@ -49,7 +56,7 @@ func TestGetLogsForContainerInPod_NoLogs(t *testing.T) {
CloudwatchLogGroup: "/kubernetes/flyte-production",
})
assert.NoError(t, err)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, nil, 0, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -80,7 +87,7 @@ func TestGetLogsForContainerInPod_BadIndex(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand All @@ -105,7 +112,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -135,7 +142,7 @@ func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -165,7 +172,7 @@ func TestGetLogsForContainerInPod_K8s(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -198,7 +205,7 @@ func TestGetLogsForContainerInPod_All(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 2)
}
Expand Down Expand Up @@ -229,7 +236,7 @@ func TestGetLogsForContainerInPod_Stackdriver(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -303,7 +310,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c
},
}

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " my-Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil)
assert.Nil(tb, err)
assert.Len(tb, logs, len(expectedTaskLogs))
if diff := deep.Equal(logs, expectedTaskLogs); len(diff) > 0 {
Expand Down Expand Up @@ -337,7 +344,7 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
Name: "StackDriver my-Suffix",
},
{
Uri: "https://flyte.corp.net/console/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/n0/taskId/my-task-name/attempt/1/view/logs",
Uri: "https://flyte.corp.net/console/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/n0-0-n0/taskId/my-task-name/attempt/1/view/logs",
MessageFormat: core.TaskLog_JSON,
Name: "Internal my-Suffix",
},
Expand Down
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type TaskExecutionID interface {

// GetID returns the underlying idl task identifier.
GetID() core.TaskExecutionIdentifier

// GetUniqueNodeID returns the fully-qualified Node ID that is unique within a
// given workflow execution.
GetUniqueNodeID() string
}

// TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"regexp"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
)

//go:generate enumer --type=TemplateScheme --trimprefix=TemplateScheme -json -yaml
Expand Down Expand Up @@ -42,7 +43,7 @@ type Input struct {
PodUnixStartTime int64
PodUnixFinishTime int64
PodUID string
TaskExecutionIdentifier *core.TaskExecutionIdentifier
TaskExecutionID pluginsCore.TaskExecutionID
ExtraTemplateVarsByScheme *TemplateVarsByScheme
}

Expand Down
96 changes: 48 additions & 48 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,55 +114,55 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...)
}
case TemplateSchemeTaskExecution:
if input.TaskExecutionIdentifier != nil {
vars = append(vars, TemplateVar{
taskExecutionIdentifier := input.TaskExecutionID.GetID()
vars = append(
vars,
TemplateVar{
defaultRegexes.NodeID,
input.TaskExecutionID.GetUniqueNodeID(),
},
TemplateVar{
defaultRegexes.TaskRetryAttempt,
strconv.FormatUint(uint64(input.TaskExecutionIdentifier.RetryAttempt), 10),
})
if input.TaskExecutionIdentifier.TaskId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.TaskID,
input.TaskExecutionIdentifier.TaskId.Name,
},
TemplateVar{
defaultRegexes.TaskVersion,
input.TaskExecutionIdentifier.TaskId.Version,
},
TemplateVar{
defaultRegexes.TaskProject,
input.TaskExecutionIdentifier.TaskId.Project,
},
TemplateVar{
defaultRegexes.TaskDomain,
input.TaskExecutionIdentifier.TaskId.Domain,
},
)
}
if input.TaskExecutionIdentifier.NodeExecutionId != nil {
vars = append(vars, TemplateVar{
defaultRegexes.NodeID,
input.TaskExecutionIdentifier.NodeExecutionId.NodeId,
})
if input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.ExecutionName,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Name,
},
TemplateVar{
defaultRegexes.ExecutionProject,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Project,
},
TemplateVar{
defaultRegexes.ExecutionDomain,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Domain,
},
)
}
}
strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10),
},
)
if taskExecutionIdentifier.TaskId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.TaskID,
taskExecutionIdentifier.TaskId.Name,
},
TemplateVar{
defaultRegexes.TaskVersion,
taskExecutionIdentifier.TaskId.Version,
},
TemplateVar{
defaultRegexes.TaskProject,
taskExecutionIdentifier.TaskId.Project,
},
TemplateVar{
defaultRegexes.TaskDomain,
taskExecutionIdentifier.TaskId.Domain,
},
)
}
if taskExecutionIdentifier.NodeExecutionId != nil && taskExecutionIdentifier.NodeExecutionId.ExecutionId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.ExecutionName,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Name,
},
TemplateVar{
defaultRegexes.ExecutionProject,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Project,
},
TemplateVar{
defaultRegexes.ExecutionDomain,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Domain,
},
)
}
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...)
Expand Down
Loading

0 comments on commit ac764a9

Please sign in to comment.