Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Introduce Template-based task logging plugin #144

Merged
merged 9 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions go/tasks/logs/config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,43 @@
package logs

import "github.com/lyft/flyteplugins/go/tasks/config"
import (
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteplugins/go/tasks/config"
)

//go:generate pflags LogConfig

// A URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

// Log plugins configs
type LogConfig struct {
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."`
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
// Deprecated: Please use KubernetesTemplateURI
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`

IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"`
// Deprecated: Please use StackDriverTemplateURI
GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"`
// Deprecated: Please use StackDriverTemplateURI
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"`
}

IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"`
GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"`
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
type TemplateLogPluginConfig struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
}

var (
Expand Down
3 changes: 3 additions & 0 deletions go/tasks/logs/logconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 66 additions & 0 deletions go/tasks/logs/logconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 93 additions & 26 deletions go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,28 @@ package logs

import (
"context"
"fmt"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/tasklog"

logUtils "github.com/lyft/flyteidl/clients/go/coreutils/logs"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
v1 "k8s.io/api/core/v1"
)

func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) {
var logs []*core.TaskLog
logConfig := GetLogConfig()

logPlugins := map[string]logUtils.LogPlugin{}
type logPlugin struct {
Name string
Plugin tasklog.Plugin
}

if logConfig.IsKubernetesEnabled {
logPlugins["Kubernetes Logs"] = logUtils.NewKubernetesLogPlugin(logConfig.KubernetesURL)
}
if logConfig.IsCloudwatchEnabled {
logPlugins["Cloudwatch Logs"] = logUtils.NewCloudwatchLogPlugin(logConfig.CloudwatchRegion, logConfig.CloudwatchLogGroup)
}
if logConfig.IsStackDriverEnabled {
logPlugins["Stackdriver Logs"] = logUtils.NewStackdriverLogPlugin(logConfig.GCPProjectName, logConfig.StackdriverLogResourceName)
// Internal
func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) {
logPlugin, err := InitializeLogPlugins(GetLogConfig())
if err != nil {
return nil, err
}

if len(logPlugins) == 0 {
if logPlugin == nil {
return nil, nil
}

Expand All @@ -44,18 +42,87 @@ func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, na
return nil, nil
}

for name, plugin := range logPlugins {
log, err := plugin.GetTaskLog(
pod.Name,
pod.Namespace,
pod.Spec.Containers[index].Name,
pod.Status.ContainerStatuses[index].ContainerID,
name+nameSuffix,
)
logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: pod.Name,
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
},
)

if err != nil {
return nil, err
}

return logs.TaskLogs, nil
}

type taskLogPluginWrapper struct {
logPlugins []logPlugin
}

func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) {
logs := make([]*core.TaskLog, 0, len(t.logPlugins))
suffix := input.LogName
for _, plugin := range t.logPlugins {
input.LogName = plugin.Name + suffix
o, err := plugin.Plugin.GetTaskLogs(input)
if err != nil {
return nil, err
return tasklog.Output{}, err
}

logs = append(logs, o.TaskLogs...)
}

return tasklog.Output{
TaskLogs: logs,
}, nil
}

// Internal
func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
// Use a list to maintain order.
logPlugins := make([]logPlugin, 0, 2)

if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)})
}
}

if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(
[]string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)})
}
}

if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(
[]string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)})
}
logs = append(logs, &log)
}
return logs, nil

if len(cfg.Templates) > 0 {
for _, cfg := range cfg.Templates {
logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.TemplateURIs, cfg.MessageFormat)})
}
}

if len(logPlugins) == 0 {
return nil, nil
}

return taskLogPluginWrapper{
logPlugins: logPlugins,
}, nil
}
Loading