Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passthrough unique node ID in task execution ID for generating log te… #4380

Merged
merged 3 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
v1 "k8s.io/api/core/v1"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flytestdlib/logger"
)
Expand All @@ -18,7 +19,7 @@ type logPlugin struct {
}

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID *core.TaskExecutionIdentifier, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand Down Expand Up @@ -53,7 +54,7 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionIdentifier: taskExecID,
TaskExecutionID: taskExecID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backwards compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn’t think of issues with backward compatibility since this will only affect new log links. Maybe I’m missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, the Input object becomes your interface, right? if somebody is referencing {.TaskIdentifier.Project} that will no longer resolve if I'm reading correctly, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, the external-facing template variables remain unchanged.

ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
},
)
Expand Down
59 changes: 33 additions & 26 deletions flyteplugins/go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,41 @@ import (
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
coreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
)

const podName = "PodName"

var dummyTaskExecID = &core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: "my-task-name",
Project: "my-task-project",
Domain: "my-task-domain",
Version: "1",
},
NodeExecutionId: &core.NodeExecutionIdentifier{
NodeId: "n0",
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "my-execution-name",
Project: "my-execution-project",
Domain: "my-execution-domain",
func dummyTaskExecID() pluginCore.TaskExecutionID {
tID := &coreMocks.TaskExecutionID{}
tID.OnGetGeneratedName().Return("generated-name")
tID.OnGetID().Return(core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: "my-task-name",
Project: "my-task-project",
Domain: "my-task-domain",
Version: "1",
},
},
RetryAttempt: 1,
NodeExecutionId: &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "my-execution-name",
Project: "my-execution-project",
Domain: "my-execution-domain",
},
},
RetryAttempt: 1,
})
tID.OnGetUniqueNodeID().Return("n0-0-n0")
return tID
}

func TestGetLogsForContainerInPod_NoPlugins(t *testing.T) {
logPlugin, err := InitializeLogPlugins(&LogConfig{})
assert.NoError(t, err)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, nil, 0, " Suffix", nil)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, l)
}
Expand All @@ -49,7 +56,7 @@ func TestGetLogsForContainerInPod_NoLogs(t *testing.T) {
CloudwatchLogGroup: "/kubernetes/flyte-production",
})
assert.NoError(t, err)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, nil, 0, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), nil, 0, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -80,7 +87,7 @@ func TestGetLogsForContainerInPod_BadIndex(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand All @@ -105,7 +112,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 1, " Suffix", nil)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 1, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -135,7 +142,7 @@ func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -165,7 +172,7 @@ func TestGetLogsForContainerInPod_K8s(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -198,7 +205,7 @@ func TestGetLogsForContainerInPod_All(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 2)
}
Expand Down Expand Up @@ -229,7 +236,7 @@ func TestGetLogsForContainerInPod_Stackdriver(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -303,7 +310,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c
},
}

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " my-Suffix", nil)
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID(), pod, 0, " my-Suffix", nil)
assert.Nil(tb, err)
assert.Len(tb, logs, len(expectedTaskLogs))
if diff := deep.Equal(logs, expectedTaskLogs); len(diff) > 0 {
Expand Down Expand Up @@ -337,7 +344,7 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
Name: "StackDriver my-Suffix",
},
{
Uri: "https://flyte.corp.net/console/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/n0/taskId/my-task-name/attempt/1/view/logs",
Uri: "https://flyte.corp.net/console/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/n0-0-n0/taskId/my-task-name/attempt/1/view/logs",
MessageFormat: core.TaskLog_JSON,
Name: "Internal my-Suffix",
},
Expand Down
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type TaskExecutionID interface {

// GetID returns the underlying idl task identifier.
GetID() core.TaskExecutionIdentifier

// GetUniqueNodeID returns the fully-qualified Node ID that is unique within a
// given workflow execution.
GetUniqueNodeID() string
}

// TaskExecutionMetadata represents any execution information for a Task. It is used to communicate meta information about the
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func (m mockTaskExecutionIdentifier) GetGeneratedName() string {
return "task-exec-name"
}

func (m mockTaskExecutionIdentifier) GetUniqueNodeID() string {
return "unique-node-id"
}

func TestDecorateEnvVars(t *testing.T) {
ctx := context.Background()
ctx = contextutils.WithWorkflowID(ctx, "fake_workflow")
Expand Down
3 changes: 2 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"regexp"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
)

//go:generate enumer --type=TemplateScheme --trimprefix=TemplateScheme -json -yaml
Expand Down Expand Up @@ -42,7 +43,7 @@ type Input struct {
PodUnixStartTime int64
PodUnixFinishTime int64
PodUID string
TaskExecutionIdentifier *core.TaskExecutionIdentifier
TaskExecutionID pluginsCore.TaskExecutionID
ExtraTemplateVarsByScheme *TemplateVarsByScheme
}

Expand Down
96 changes: 48 additions & 48 deletions flyteplugins/go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,55 +114,55 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.Pod...)
}
case TemplateSchemeTaskExecution:
if input.TaskExecutionIdentifier != nil {
vars = append(vars, TemplateVar{
taskExecutionIdentifier := input.TaskExecutionID.GetID()
vars = append(
vars,
TemplateVar{
defaultRegexes.NodeID,
input.TaskExecutionID.GetUniqueNodeID(),
},
TemplateVar{
defaultRegexes.TaskRetryAttempt,
strconv.FormatUint(uint64(input.TaskExecutionIdentifier.RetryAttempt), 10),
})
if input.TaskExecutionIdentifier.TaskId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.TaskID,
input.TaskExecutionIdentifier.TaskId.Name,
},
TemplateVar{
defaultRegexes.TaskVersion,
input.TaskExecutionIdentifier.TaskId.Version,
},
TemplateVar{
defaultRegexes.TaskProject,
input.TaskExecutionIdentifier.TaskId.Project,
},
TemplateVar{
defaultRegexes.TaskDomain,
input.TaskExecutionIdentifier.TaskId.Domain,
},
)
}
if input.TaskExecutionIdentifier.NodeExecutionId != nil {
vars = append(vars, TemplateVar{
defaultRegexes.NodeID,
input.TaskExecutionIdentifier.NodeExecutionId.NodeId,
})
if input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.ExecutionName,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Name,
},
TemplateVar{
defaultRegexes.ExecutionProject,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Project,
},
TemplateVar{
defaultRegexes.ExecutionDomain,
input.TaskExecutionIdentifier.NodeExecutionId.ExecutionId.Domain,
},
)
}
}
strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10),
},
)
if taskExecutionIdentifier.TaskId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.TaskID,
taskExecutionIdentifier.TaskId.Name,
},
TemplateVar{
defaultRegexes.TaskVersion,
taskExecutionIdentifier.TaskId.Version,
},
TemplateVar{
defaultRegexes.TaskProject,
taskExecutionIdentifier.TaskId.Project,
},
TemplateVar{
defaultRegexes.TaskDomain,
taskExecutionIdentifier.TaskId.Domain,
},
)
}
if taskExecutionIdentifier.NodeExecutionId != nil && taskExecutionIdentifier.NodeExecutionId.ExecutionId != nil {
vars = append(
vars,
TemplateVar{
defaultRegexes.ExecutionName,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Name,
},
TemplateVar{
defaultRegexes.ExecutionProject,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Project,
},
TemplateVar{
defaultRegexes.ExecutionDomain,
taskExecutionIdentifier.NodeExecutionId.ExecutionId.Domain,
},
)
}
if gotExtraTemplateVars {
vars = append(vars, input.ExtraTemplateVarsByScheme.TaskExecution...)
Expand Down
Loading
Loading