Skip to content

Commit

Permalink
Fix taskrun log throwing error
Browse files Browse the repository at this point in the history
This will fix the issue of taskrun log
throwing error if the pod name is
not yet available in status

This will wait for pod name to be
available in status for follow mode
till 10 secs

And in case of non follow mode it
will check whether pod name is there
or not

Adds test for the scenarios
  • Loading branch information
piyush-garg authored and tekton-robot committed Nov 5, 2019
1 parent 7e7edf8 commit e0d2ecd
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 13 deletions.
3 changes: 1 addition & 2 deletions pkg/cmd/task/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
"fmt"
"strings"

"github.com/tektoncd/cli/pkg/cmd/taskrun"

"github.com/spf13/cobra"
"github.com/tektoncd/cli/pkg/cli"
"github.com/tektoncd/cli/pkg/cmd/taskrun"
"github.com/tektoncd/cli/pkg/flags"
"github.com/tektoncd/cli/pkg/helper/labels"
"github.com/tektoncd/cli/pkg/helper/params"
Expand Down
66 changes: 60 additions & 6 deletions pkg/cmd/taskrun/log_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package taskrun

import (
"fmt"
"time"

"github.com/pkg/errors"
"github.com/tektoncd/cli/pkg/cli"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
)

type step struct {
Expand Down Expand Up @@ -53,6 +55,7 @@ type LogReader struct {
Streamer stream.NewStreamerFunc
Follow bool
AllSteps bool
Stream *cli.Stream
}

func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {
Expand All @@ -69,7 +72,7 @@ func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {

func (lr *LogReader) readLogs(tr *v1alpha1.TaskRun) (<-chan Log, <-chan error, error) {
if lr.Follow {
return lr.readLiveLogs(tr)
return lr.readLiveLogs()
}
return lr.readAvailableLogs(tr)
}
Expand All @@ -92,7 +95,12 @@ func (lr *LogReader) formTaskName(tr *v1alpha1.TaskRun) {
lr.Task = fmt.Sprintf("Task %d", lr.Number)
}

func (lr *LogReader) readLiveLogs(tr *v1alpha1.TaskRun) (<-chan Log, <-chan error, error) {
func (lr *LogReader) readLiveLogs() (<-chan Log, <-chan error, error) {
tr, err := lr.waitUntilPodNameAvailable(10)
if err != nil {
return nil, nil, err
}

var (
podName = tr.Status.PodName
kube = lr.Clients.Kube
Expand All @@ -110,15 +118,19 @@ func (lr *LogReader) readLiveLogs(tr *v1alpha1.TaskRun) (<-chan Log, <-chan erro
}

func (lr *LogReader) readAvailableLogs(tr *v1alpha1.TaskRun) (<-chan Log, <-chan error, error) {
if !tr.HasStarted() {
return nil, nil, fmt.Errorf("task %s has not started yet", lr.Task)
}

if tr.Status.PodName == "" {
return nil, nil, fmt.Errorf("pod for taskrun %s not available yet", tr.Name)
}

var (
kube = lr.Clients.Kube
podName = tr.Status.PodName
)

if !tr.HasStarted() {
return nil, nil, fmt.Errorf("task %s has not hasStarted yet", lr.Task)
}

p := pods.New(podName, lr.Ns, kube, lr.Streamer)
pod, err := p.Get()
if err != nil {
Expand Down Expand Up @@ -227,3 +239,45 @@ func getSteps(pod *corev1.Pod) []*step {

return steps
}

// reading of logs should wait untill the name of pod is
// updates in the status. Open a watch channel on run
// and keep checking the status until the pod name updatea
// or the timeout is reached
func (lr *LogReader) waitUntilPodNameAvailable(timeout time.Duration) (*v1alpha1.TaskRun, error) {
var first = true
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", lr.Run).String(),
}
tkn := lr.Clients.Tekton
run, err := tkn.TektonV1alpha1().TaskRuns(lr.Ns).Get(lr.Run, metav1.GetOptions{})
if err != nil {
return nil, err
}

if run.Status.PodName != "" {
return run, nil
}

watchRun, err := tkn.TektonV1alpha1().TaskRuns(lr.Ns).Watch(opts)
if err != nil {
return nil, err
}
for {
select {
case event := <-watchRun.ResultChan():
run := event.Object.(*v1alpha1.TaskRun)
if run.Status.PodName != "" {
watchRun.Stop()
return run, nil
}
if first {
first = false
fmt.Fprintln(lr.Stream.Out, "Task still running ...")
}
case <-time.After(timeout * time.Second):
watchRun.Stop()
return nil, fmt.Errorf("task %s create failed or has not started yet or pod for task not yet available", lr.Task)
}
}
}
2 changes: 1 addition & 1 deletion pkg/cmd/taskrun/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (lw *LogWriter) Write(s *cli.Stream, logC <-chan Log, errC <-chan error) {
errC = nil
continue
}
lw.fmt.Error(s.Out, "%s\n", e)
lw.fmt.Error(s.Err, "%s\n", e)
}
}
}
12 changes: 8 additions & 4 deletions pkg/cmd/taskrun/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ tkn taskrun logs -f foo -n bar
return err
}

opts.Streamer = pods.NewStream

return opts.Run()
},
}
Expand All @@ -83,7 +81,12 @@ tkn taskrun logs -f foo -n bar

func (lo *LogOptions) Run() error {
if lo.TaskrunName == "" {
return fmt.Errorf("missing mandatory argument taskrun")
return fmt.Errorf("missing mandatory argument taskrun name")
}

streamer := pods.NewStream
if lo.Streamer != nil {
streamer = lo.Streamer
}

cs, err := lo.Params.Clients()
Expand All @@ -95,7 +98,8 @@ func (lo *LogOptions) Run() error {
Run: lo.TaskrunName,
Ns: lo.Params.Namespace(),
Clients: cs,
Streamer: lo.Streamer,
Streamer: streamer,
Stream: lo.Stream,
Follow: lo.Follow,
AllSteps: lo.AllSteps,
}
Expand Down
Loading

0 comments on commit e0d2ecd

Please sign in to comment.