Skip to content

Commit

Permalink
Add support for using task execution ID fields in log URI templates (f…
Browse files Browse the repository at this point in the history
…lyteorg#372)

* Refactor task log templates to support extra vars

* plumbing for k8s

* Cleanup use of providers

* cleanup

* more cleanups

* more cleanups

* more cleanups

* Plumb through task context for plugins

* fix

Signed-off-by: Jeev B <[email protected]>

* move task execution identifier into Input struct

* use pointer

* tests

Signed-off-by: Jeev B <[email protected]>

* fix linting

Signed-off-by: Jeev B <[email protected]>

* cleanups

* fix

* revert to using regex replacements for performance

* Readd benchmark

* Split templating scheme (flyteorg#373)

* Use a split templating scheme

* add enum for TemplateScheme

* cleanup comment

* fix linting issues

* add unit tests for templateVarsForScheme

* update for consistency

---------

Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb authored Jul 15, 2023
1 parent 3d2525a commit 7ddd993
Show file tree
Hide file tree
Showing 23 changed files with 874 additions and 250 deletions.
2 changes: 2 additions & 0 deletions go/tasks/logs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logs
import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
)

//go:generate pflags LogConfig --default-var=DefaultConfig
Expand Down Expand Up @@ -38,6 +39,7 @@ type TemplateLogPluginConfig struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
Scheme tasklog.TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."`
}

var (
Expand Down
45 changes: 22 additions & 23 deletions go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type logPlugin struct {
}

// Internal
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) {
func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID *core.TaskExecutionIdentifier, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) {
if logPlugin == nil {
return nil, nil
}
Expand All @@ -43,16 +43,18 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, pod

logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: pod.Name,
PodUID: string(pod.GetUID()),
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339),
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
PodName: pod.Name,
PodUID: string(pod.GetUID()),
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodRFC3339StartTime: time.Unix(startTime, 0).Format(time.RFC3339),
PodRFC3339FinishTime: time.Unix(finishTime, 0).Format(time.RFC3339),
PodUnixStartTime: startTime,
PodUnixFinishTime: finishTime,
TaskExecutionIdentifier: taskExecID,
ExtraTemplateVarsByScheme: extraLogTemplateVarsByScheme,
},
)

Expand All @@ -70,6 +72,7 @@ type taskLogPluginWrapper struct {
func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) {
logs := make([]*core.TaskLog, 0, len(t.logPlugins))
suffix := input.LogName

for _, plugin := range t.logPlugins {
input.LogName = plugin.Name + suffix
o, err := plugin.Plugin.GetTaskLogs(input)
Expand All @@ -92,41 +95,37 @@ func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {

if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)})
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)})
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)})
}
}

if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)})
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(
[]string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)})
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)})
}
}

if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)})
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(
[]string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)})
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)})
}
}

if len(cfg.Templates) > 0 {
for _, cfg := range cfg.Templates {
logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.TemplateURIs, cfg.MessageFormat)})
logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.Scheme, cfg.TemplateURIs, cfg.MessageFormat)})
}
}

if len(logPlugins) == 0 {
return nil, nil
}

return taskLogPluginWrapper{
logPlugins: logPlugins,
}, nil
return taskLogPluginWrapper{logPlugins: logPlugins}, nil
}
51 changes: 42 additions & 9 deletions go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/go-test/deep"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -14,10 +15,29 @@ import (

const podName = "PodName"

var dummyTaskExecID = &core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: "my-task-name",
Project: "my-task-project",
Domain: "my-task-domain",
Version: "1",
},
NodeExecutionId: &core.NodeExecutionIdentifier{
NodeId: "n0",
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "my-execution-name",
Project: "my-execution-project",
Domain: "my-execution-domain",
},
},
RetryAttempt: 1,
}

func TestGetLogsForContainerInPod_NoPlugins(t *testing.T) {
logPlugin, err := InitializeLogPlugins(&LogConfig{})
assert.NoError(t, err)
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, nil, 0, " Suffix")
l, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, nil, 0, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, l)
}
Expand All @@ -29,7 +49,7 @@ func TestGetLogsForContainerInPod_NoLogs(t *testing.T) {
CloudwatchLogGroup: "/kubernetes/flyte-production",
})
assert.NoError(t, err)
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, nil, 0, " Suffix")
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, nil, 0, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -60,7 +80,7 @@ func TestGetLogsForContainerInPod_BadIndex(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, pod, 1, " Suffix")
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 1, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand All @@ -85,7 +105,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) {
}
pod.Name = podName

p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, pod, 1, " Suffix")
p, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 1, " Suffix", nil)
assert.NoError(t, err)
assert.Nil(t, p)
}
Expand Down Expand Up @@ -115,7 +135,7 @@ func TestGetLogsForContainerInPod_Cloudwatch(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, pod, 0, " Suffix")
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -145,7 +165,7 @@ func TestGetLogsForContainerInPod_K8s(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, pod, 0, " Suffix")
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -178,7 +198,7 @@ func TestGetLogsForContainerInPod_All(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, pod, 0, " Suffix")
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 2)
}
Expand Down Expand Up @@ -209,7 +229,7 @@ func TestGetLogsForContainerInPod_Stackdriver(t *testing.T) {
}
pod.Name = podName

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, pod, 0, " Suffix")
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " Suffix", nil)
assert.Nil(t, err)
assert.Len(t, logs, 1)
}
Expand Down Expand Up @@ -283,7 +303,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c
},
}

logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, pod, 0, " my-Suffix")
logs, err := GetLogsForContainerInPod(context.TODO(), logPlugin, dummyTaskExecID, pod, 0, " my-Suffix", nil)
assert.Nil(tb, err)
assert.Len(tb, logs, len(expectedTaskLogs))
if diff := deep.Equal(logs, expectedTaskLogs); len(diff) > 0 {
Expand All @@ -301,12 +321,25 @@ func TestGetLogsForContainerInPod_Templates(t *testing.T) {
},
MessageFormat: core.TaskLog_JSON,
},
{
DisplayName: "Internal",
TemplateURIs: []string{
"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs",
},
MessageFormat: core.TaskLog_JSON,
Scheme: tasklog.TemplateSchemeTaskExecution,
},
},
}, []*core.TaskLog{
{
Uri: "https://my-log-server/my-namespace/my-pod/ContainerName/ContainerID",
MessageFormat: core.TaskLog_JSON,
Name: "StackDriver my-Suffix",
},
{
Uri: "https://flyte.corp.net/console/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/n0/taskId/my-task-name/attempt/1/view/logs",
MessageFormat: core.TaskLog_JSON,
Name: "Internal my-Suffix",
},
})
}
56 changes: 42 additions & 14 deletions go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,58 @@
package tasklog

import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
import (
"regexp"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

//go:generate enumer --type=TemplateScheme --trimprefix=TemplateScheme -json -yaml

type TemplateScheme int

const (
TemplateSchemePod TemplateScheme = iota
TemplateSchemeTaskExecution
)

type TemplateVar struct {
Regex *regexp.Regexp
Value string
}

type TemplateVars []TemplateVar

type TemplateVarsByScheme struct {
Common TemplateVars
Pod TemplateVars
TaskExecution TemplateVars
}

// Input contains all available information about task's execution that a log plugin can use to construct task's
// log links.
type Input struct {
HostName string `json:"hostname"`
PodName string `json:"podName"`
Namespace string `json:"namespace"`
ContainerName string `json:"containerName"`
ContainerID string `json:"containerId"`
LogName string `json:"logName"`
PodRFC3339StartTime string `json:"podRFC3339StartTime"`
PodRFC3339FinishTime string `json:"podRFC3339FinishTime"`
PodUnixStartTime int64 `json:"podUnixStartTime"`
PodUnixFinishTime int64 `json:"podUnixFinishTime"`
PodUID string `json:"podUID"`
HostName string
PodName string
Namespace string
ContainerName string
ContainerID string
LogName string
PodRFC3339StartTime string
PodRFC3339FinishTime string
PodUnixStartTime int64
PodUnixFinishTime int64
PodUID string
TaskExecutionIdentifier *core.TaskExecutionIdentifier
ExtraTemplateVarsByScheme *TemplateVarsByScheme
}

// Output contains all task logs a plugin generates for a given Input.
type Output struct {
TaskLogs []*core.TaskLog `json:"taskLogs"`
TaskLogs []*core.TaskLog
}

// Plugin represents an interface for task log plugins to implement to plug generated task log links into task events.
type Plugin interface {
// Generates a TaskLog object given necessary computation information
GetTaskLogs(input Input) (logs Output, err error)
GetTaskLogs(i Input) (logs Output, err error)
}
Loading

0 comments on commit 7ddd993

Please sign in to comment.