Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream logs from kube-task #228

Merged
merged 3 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions pkg/format/format.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package format

import (
"bufio"
"io"
"regexp"
"strings"

Expand All @@ -11,9 +13,30 @@ func Log(podName string, containerName string, output string) {
if output != "" {
logs := regexp.MustCompile("[\r\n]").Split(output, -1)
for _, l := range logs {
if strings.TrimSpace(l) != "" {
log.Info("Pod: ", podName, " Container: ", containerName, " Out: ", l)
}
info(podName, containerName, l)
}
}
}

func LogStream(podName string, containerName string, output io.ReadCloser) chan string {
logCh := make(chan string)
s := bufio.NewScanner(output)
go func() {
defer close(logCh)
for s.Scan() {
l := s.Text()
info(podName, containerName, l)
logCh <- l
}
if err := s.Err(); err != nil {
log.Error("Pod: ", podName, " Container: ", containerName, " Failed to stream log from pod: ", err.Error())
}
}()
return logCh
}

func info(podName string, containerName string, l string) {
if strings.TrimSpace(l) != "" {
log.Info("Pod: ", podName, " Container: ", containerName, " Out: ", l)
}
}
36 changes: 24 additions & 12 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,31 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) {
var op map[string]interface{}
logs := regexp.MustCompile("[\n]").Split(out, -1)
for _, l := range logs {
// Log should contain "###Phase-output###:" string
if strings.Contains(l, output.PhaseOpString) {
if op == nil {
op = make(map[string]interface{})
}
pattern := regexp.MustCompile(`###Phase-output###:(.*?)*$`)
match := pattern.FindAllStringSubmatch(l, 1)
opObj, err := output.UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
op[opObj.Key] = opObj.Value
opObj, err := parseLogLineForOutput(l)
if err != nil {
return nil, err
}
if opObj == nil {
continue
}
if op == nil {
op = make(map[string]interface{})
}
op[opObj.Key] = opObj.Value
}
return op, nil
}

var outputRE = regexp.MustCompile(`###Phase-output###:(.*?)*$`)

func parseLogLineForOutput(l string) (*output.Output, error) {
if !strings.Contains(l, output.PhaseOpString) {
return nil, nil
}
match := outputRE.FindAllStringSubmatch(l, 1)
op, err := output.UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,32 @@ func kubeTask(ctx context.Context, cli kubernetes.Interface, namespace, image st

func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
return func(ctx context.Context, pod *v1.Pod) (map[string]interface{}, error) {
// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
// Fetch logs from the pod
logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name)
r, err := kube.StreamPodLogs(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
format.Log(pod.Name, pod.Spec.Containers[0].Name, logs)
out, err := parseLogAndCreateOutput(logs)
return out, errors.Wrap(err, "Failed to parse phase output")
defer r.Close()
logCh := format.LogStream(pod.Name, pod.Spec.Containers[0].Name, r)
// Wait for pod completion
err = kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name)
if err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
out := make(map[string]interface{})
for l := range logCh {
op, err := parseLogLineForOutput(l)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse phase output")
}
if op != nil {
out[op.Key] = op.Value
}
}
return out, nil
}
}

Expand Down
113 changes: 74 additions & 39 deletions pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,61 +47,96 @@ func (s *KubeTaskSuite) TearDownSuite(c *C) {
}
}

func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint {
func outputPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "kanisterio/kanister-tools:0.20.0",
KubeTaskCommandArg: []string{
"sh",
"-c",
"kando output version 0.20.0",
},
},
}
}

func sleepPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testSleep",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "ubuntu:latest",
KubeTaskCommandArg: []string{
"sleep",
"2",
},
},
}
}

func tickPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testTick",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "alpine:3.10",
KubeTaskCommandArg: []string{
"sh",
"-c",
`for i in $(seq 3); do echo Tick: "${i}"; sleep 1; done`,
},
},
}
}

func newTaskBlueprint(phases ...crv1alpha1.BlueprintPhase) *crv1alpha1.Blueprint {
return &crv1alpha1.Blueprint{
Actions: map[string]*crv1alpha1.BlueprintAction{
"test": {
Kind: "StatefulSet",
Phases: []crv1alpha1.BlueprintPhase{
{
Name: "testOutput",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "kanisterio/kanister-tools:0.20.0",
KubeTaskCommandArg: []string{
"sh",
"-c",
"kando output version 0.20.0",
},
},
},
{
Name: "testSleep",
Func: "KubeTask",
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: "ubuntu:latest",
KubeTaskCommandArg: []string{
"sleep",
"2",
},
},
},
},
Kind: "StatefulSet",
Phases: phases,
},
},
}
}

func (s *KubeTaskSuite) TestKubeTask(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
},
}

action := "test"
bp := newTaskBlueprint(s.namespace)
phases, err := kanister.GetPhases(*bp, action, tp)
c.Assert(err, IsNil)
for _, p := range phases {
out, err := p.Exec(ctx, *bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
if out != nil {
c.Assert(out["version"], NotNil)
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(outputPhase(s.namespace), sleepPhase(s.namespace), tickPhase(s.namespace)),
outs: []map[string]interface{}{
map[string]interface{}{
"version": "0.20.0",
},
map[string]interface{}{},
map[string]interface{}{},
},
},
} {

phases, err := kanister.GetPhases(*tc.bp, action, tp)
c.Assert(err, IsNil)
c.Assert(phases, HasLen, len(tc.outs))
for i, p := range phases {
out, err := p.Exec(ctx, *tc.bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
c.Assert(out, DeepEquals, tc.outs[i])
}
}
}
28 changes: 15 additions & 13 deletions pkg/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kube

import (
"context"
"io"
"io/ioutil"

"github.com/pkg/errors"
Expand Down Expand Up @@ -64,6 +65,13 @@ func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *v1.Pod) error
return nil
}

func StreamPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (io.ReadCloser, error) {
plo := &v1.PodLogOptions{
Follow: true,
}
return cli.CoreV1().Pods(namespace).GetLogs(name, plo).Stream()
}

// GetPodLogs fetches the logs from the given pod
func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (string, error) {
reader, err := cli.CoreV1().Pods(namespace).GetLogs(name, &v1.PodLogOptions{}).Stream()
Expand All @@ -78,18 +86,15 @@ func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name s
return string(bytes), nil
}

// WaitForPodReady waits for a pod to reach Running state
// WaitForPodReady waits for a pod to exit the pending state
func WaitForPodReady(ctx context.Context, cli kubernetes.Interface, namespace, name string) error {
err := poll.Wait(ctx, func(ctx context.Context) (bool, error) {
p, err := cli.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
return (p.Status.Phase == v1.PodRunning), nil
return p.Status.Phase != v1.PodPending && p.Status.Phase != "", nil
})
if err == nil {
return nil
}
return errors.Wrapf(err, "Pod did not transition into running state. Namespace:%s, Name:%s", namespace, name)
}

Expand All @@ -100,20 +105,17 @@ func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespa
if err != nil {
return true, err
}
if p.Status.Phase == v1.PodFailed {
return false, errors.Errorf("Pod %s failed", name)
}
if p.Status.Phase == v1.PodRunning {
switch p.Status.Phase {
case v1.PodRunning:
for _, con := range p.Status.ContainerStatuses {
if con.State.Terminated != nil {
return false, errors.Errorf("Container %v is terminated, while Pod %v is Running", con.Name, name)
}
}
case v1.PodFailed:
return false, errors.Errorf("Pod %s failed", name)
}
return (p.Status.Phase == v1.PodSucceeded), nil
return p.Status.Phase == v1.PodSucceeded, nil
})
if err == nil {
return nil
}
return errors.Wrap(err, "Pod did not transition into complete state")
}