From 0b636c6454c7e81df2d08cc9ecc58f2419f37033 Mon Sep 17 00:00:00 2001 From: Aaron Hetherington Date: Mon, 8 May 2023 14:55:54 +0100 Subject: [PATCH] Fix for issue 736, refactor log streams for k8 pods --- pkg/workceptor/kubernetes.go | 425 ++++++++++++++++++----------------- 1 file changed, 223 insertions(+), 202 deletions(-) diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index 78ec744b0..ff34080a2 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -70,6 +70,9 @@ type kubeExtraData struct { // ErrPodCompleted is returned when pod has already completed before we could attach. var ErrPodCompleted = fmt.Errorf("pod ran to completion") +// ErrPodFailed is returned when pod has failed before we could attach. +var ErrPodFailed = fmt.Errorf("pod failed to start") + // ErrImagePullBackOff is returned when the image for the container in the Pod cannot be pulled. var ErrImagePullBackOff = fmt.Errorf("container failed to start") @@ -82,7 +85,9 @@ func podRunningAndReady() func(event watch.Event) (bool, error) { } if t, ok := event.Object.(*corev1.Pod); ok { switch t.Status.Phase { - case corev1.PodFailed, corev1.PodSucceeded: + case corev1.PodFailed: + return false, ErrPodFailed + case corev1.PodSucceeded: return false, ErrPodCompleted case corev1.PodRunning, corev1.PodPending: conditions := t.Status.Conditions @@ -118,6 +123,191 @@ func podRunningAndReady() func(event watch.Event) (bool, error) { return inner } +func (kw *kubeUnit) kubeLoggingConnectionHandler(timestamps bool) (io.ReadCloser, error) { + var logStream io.ReadCloser + var err error + var sinceTime time.Time + podNamespace := kw.pod.Namespace + podName := kw.pod.Name + podOptions := &corev1.PodLogOptions{ + Container: "worker", + Follow: true, + } + if timestamps { + podOptions.Timestamps = true + podOptions.SinceTime = &metav1.Time{Time: sinceTime} + } + + logReq := kw.clientset.CoreV1().Pods(podNamespace).GetLogs( + podName, podOptions, + ) + // get logstream, with retry + for retries := 5; retries > 0; retries-- { + logStream, err = logReq.Stream(kw.ctx) + if err == nil { + break + } else { + kw.Warning( + "Error opening log stream for pod %s/%s. Will retry %d more times. Error: %s", + podNamespace, + podName, + retries, + err, + ) + time.Sleep(time.Second) + } + } + if err != nil { + errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Error: %s", podNamespace, podName, err) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + + return nil, err + } + + return logStream, nil +} + +func (kw *kubeUnit) kubeLoggingNoReconnect(streamWait *sync.WaitGroup, stdout *stdoutWriter, stdoutErr *error) { + // Legacy method, for use on k8s < v1.23.14 + // uses io.Copy to stream data from pod to stdout file + // known issues around this, as logstream can terminate due to log rotation + // or 4 hr timeout + defer streamWait.Done() + podNamespace := kw.pod.Namespace + podName := kw.pod.Name + logStream, err := kw.kubeLoggingConnectionHandler(false) + if err != nil { + return + } + + _, *stdoutErr = io.Copy(stdout, logStream) + if *stdoutErr != nil { + kw.Error( + "Error streaming pod logs to stdout for pod %s/%s. Error: %s", + podNamespace, + podName, + *stdoutErr, + ) + } +} + +func (kw *kubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *stdoutWriter, stdinErr *error, stdoutErr *error) { + // preferred method for k8s >= 1.23.14 + defer streamWait.Done() + var sinceTime time.Time + var err error + podNamespace := kw.pod.Namespace + podName := kw.pod.Name + + retries := 5 + successfulWrite := false + remainingRetries := retries // resets on each successful read from pod stdout + + for { + if *stdinErr != nil { + break + } + + // get pod, with retry + for retries := 5; retries > 0; retries-- { + kw.pod, err = kw.clientset.CoreV1().Pods(podNamespace).Get(kw.ctx, podName, metav1.GetOptions{}) + if err == nil { + break + } else { + kw.Warning( + "Error getting pod %s/%s. Will retry %d more times. Error: %s", + podNamespace, + podName, + retries, + err, + ) + time.Sleep(time.Second) + } + } + if err != nil { + errMsg := fmt.Sprintf("Error getting pod %s/%s. Error: %s", podNamespace, podName, err) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + + break + } + + logStream, err := kw.kubeLoggingConnectionHandler(true) + if err != nil { + break + } + + // read from logstream + streamReader := bufio.NewReader(logStream) + for *stdinErr == nil { // check between every line read to see if we need to stop reading + line, err := streamReader.ReadString('\n') + if err == io.EOF { + kw.Debug( + "Detected EOF for pod %s/%s. Will retry %d more times. Error: %s", + podNamespace, + podName, + remainingRetries, + err, + ) + successfulWrite = false + remainingRetries-- + if remainingRetries > 0 { + time.Sleep(200 * time.Millisecond) + + break + } + + return + } else if _, ok := err.(http2.GoAwayError); ok { + // GOAWAY is sent by the server to indicate that the server is gracefully shutting down + // this happens if the kube API server we are connected to is being restarted or is shutting down + // for example during a cluster upgrade and rolling restart of the master node + kw.Info( + "Detected http2.GoAwayError for pod %s/%s. Will retry %d more times. Error: %s", + podNamespace, + podName, + remainingRetries, + err, + ) + successfulWrite = false + remainingRetries-- + if remainingRetries > 0 { + time.Sleep(200 * time.Millisecond) + + break + } + } + if err != nil { + *stdoutErr = err + kw.Error("Error reading from pod %s/%s: %s", podNamespace, podName, err) + + return + } + + split := strings.SplitN(line, " ", 2) + timeStamp := parseTime(split[0]) + if !timeStamp.After(sinceTime) && !successfulWrite { + continue + } + msg := split[1] + + _, err = stdout.Write([]byte(msg)) + if err != nil { + *stdoutErr = fmt.Errorf("writing to stdout: %s", err) + kw.Error("Error writing to stdout: %s", err) + + return + } + remainingRetries = retries // each time we read successfully, reset this counter + sinceTime = *timeStamp + successfulWrite = true + } + + logStream.Close() + } +} + func (kw *kubeUnit) createPod(env map[string]string) error { ked := kw.UnredactedStatus().ExtraData.(*kubeExtraData) command, err := shlex.Split(ked.Command) @@ -239,6 +429,7 @@ func (kw *kubeUnit) createPod(env map[string]string) error { ctxPodReady, _ = context.WithTimeout(kw.ctx, kw.podPendingTimeout) } + time.Sleep(2 * time.Second) ev, err := watch2.UntilWithSync(ctxPodReady, lw, &corev1.Pod{}, nil, podRunningAndReady()) if ev == nil || ev.Object == nil { return fmt.Errorf("did not return an event while watching pod for work unit %s", kw.ID()) @@ -264,14 +455,37 @@ func (kw *kubeUnit) createPod(env map[string]string) error { return err } else if err != nil { // any other error besides ErrPodCompleted + stdout, err2 := newStdoutWriter(kw.UnitDir()) + if err2 != nil { + errMsg := fmt.Sprintf("Error opening stdout file: %s", err2) + kw.Error(errMsg) + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + + return fmt.Errorf(errMsg) + } + var stdoutErr error + var streamWait sync.WaitGroup + streamWait.Add(1) + go kw.kubeLoggingNoReconnect(&streamWait, stdout, &stdoutErr) + streamWait.Wait() kw.Cancel() - for _, cstat := range kw.pod.Status.ContainerStatuses { - if cstat.Name == "worker" { - if cstat.State.Waiting != nil { - return fmt.Errorf("%s, %s", err.Error(), cstat.State.Waiting.Reason) - } + if len(kw.pod.Status.ContainerStatuses) == 1 { + if kw.pod.Status.ContainerStatuses[0].State.Waiting != nil { + return fmt.Errorf("%s, %s", err.Error(), kw.pod.Status.ContainerStatuses[0].State.Waiting.Reason) + } - break + for _, cstat := range kw.pod.Status.ContainerStatuses { + if cstat.Name == "worker" { + if cstat.State.Waiting != nil { + return fmt.Errorf("%s, %s", err.Error(), cstat.State.Waiting.Reason) + } + + if cstat.State.Terminated != nil && cstat.State.Terminated.ExitCode != 0 { + return fmt.Errorf("%s, exit code %d: %s", err.Error(), cstat.State.Terminated.ExitCode, cstat.State.Terminated.Message) + } + + break + } } } @@ -505,206 +719,13 @@ func (kw *kubeUnit) runWorkUsingLogger() { }() } - noReconnect := func() { - // Legacy method, for use on k8s < v1.23.14 - // uses io.Copy to stream data from pod to stdout file - // known issues around this, as logstream can terminate due to log rotation - // or 4 hr timeout - defer streamWait.Done() - var logStream io.ReadCloser - logReq := kw.clientset.CoreV1().Pods(podNamespace).GetLogs( - podName, &corev1.PodLogOptions{ - Container: "worker", - Follow: true, - }, - ) - // get logstream, with retry - for retries := 5; retries > 0; retries-- { - logStream, err = logReq.Stream(kw.ctx) - if err == nil { - break - } else { - kw.Warning( - "Error opening log stream for pod %s/%s. Will retry %d more times. Error: %s", - podNamespace, - podName, - retries, - err, - ) - time.Sleep(time.Second) - } - } - if err != nil { - errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Error: %s", podNamespace, podName, err) - kw.Error(errMsg) - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - - return - } - - _, stdoutErr = io.Copy(stdout, logStream) - if stdoutErr != nil { - kw.Error( - "Error streaming pod logs to stdout for pod %s/%s. Error: %s", - podNamespace, - podName, - stdoutErr, - ) - } - } - - withReconnect := func() { - // preferred method for k8s >= 1.23.14 - defer streamWait.Done() - var sinceTime time.Time - var logStream io.ReadCloser - retries := 5 - successfulWrite := false - remainingRetries := retries // resets on each successful read from pod stdout - - for { - if stdinErr != nil { - break - } - - // get pod, with retry - for retries := 5; retries > 0; retries-- { - kw.pod, err = kw.clientset.CoreV1().Pods(podNamespace).Get(kw.ctx, podName, metav1.GetOptions{}) - if err == nil { - break - } else { - kw.Warning( - "Error getting pod %s/%s. Will retry %d more times. Error: %s", - podNamespace, - podName, - retries, - err, - ) - time.Sleep(time.Second) - } - } - if err != nil { - errMsg := fmt.Sprintf("Error getting pod %s/%s. Error: %s", podNamespace, podName, err) - kw.Error(errMsg) - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - - break - } - - logReq := kw.clientset.CoreV1().Pods(podNamespace).GetLogs( - podName, &corev1.PodLogOptions{ - Container: "worker", - Follow: true, - Timestamps: true, - SinceTime: &metav1.Time{Time: sinceTime}, - }, - ) - // get logstream, with retry - for retries := 5; retries > 0; retries-- { - logStream, err = logReq.Stream(kw.ctx) - if err == nil { - break - } else { - kw.Warning( - "Error opening log stream for pod %s/%s. Will retry %d more times. Error: %s", - podNamespace, - podName, - retries, - err, - ) - time.Sleep(time.Second) - } - } - if err != nil { - errMsg := fmt.Sprintf( - "Error opening log stream for pod %s/%s. Error: %s", - podNamespace, - podName, - err, - ) - kw.Error(errMsg) - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - - break - } - - // read from logstream - streamReader := bufio.NewReader(logStream) - for stdinErr == nil { // check between every line read to see if we need to stop reading - line, err := streamReader.ReadString('\n') - if err == io.EOF { - kw.Debug( - "Detected EOF for pod %s/%s. Will retry %d more times. Error: %s", - podNamespace, - podName, - remainingRetries, - err, - ) - successfulWrite = false - remainingRetries-- - if remainingRetries > 0 { - time.Sleep(200 * time.Millisecond) - - break - } - - return - } else if _, ok := err.(http2.GoAwayError); ok { - // GOAWAY is sent by the server to indicate that the server is gracefully shutting down - // this happens if the kube API server we are connected to is being restarted or is shutting down - // for example during a cluster upgrade and rolling restart of the master node - kw.Info( - "Detected http2.GoAwayError for pod %s/%s. Will retry %d more times. Error: %s", - podNamespace, - podName, - remainingRetries, - err, - ) - successfulWrite = false - remainingRetries-- - if remainingRetries > 0 { - time.Sleep(200 * time.Millisecond) - - break - } - } - if err != nil { - stdoutErr = err - kw.Error("Error reading from pod %s/%s: %s", podNamespace, podName, err) - - return - } - - split := strings.SplitN(line, " ", 2) - timeStamp := parseTime(split[0]) - if !timeStamp.After(sinceTime) && !successfulWrite { - continue - } - msg := split[1] - - _, err = stdout.Write([]byte(msg)) - if err != nil { - stdoutErr = fmt.Errorf("writing to stdout: %s", err) - kw.Error("Error writing to stdout: %s", err) - - return - } - remainingRetries = retries // each time we read successfully, reset this counter - sinceTime = *timeStamp - successfulWrite = true - } - - logStream.Close() - } - } - stdoutWithReconnect := shouldUseReconnect(kw) if stdoutWithReconnect && stdoutErr == nil { kw.Debug("streaming stdout with reconnect support") - go withReconnect() + go kw.kubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr) } else { kw.Debug("streaming stdout with no reconnect support") - go noReconnect() + go kw.kubeLoggingNoReconnect(&streamWait, stdout, &stdoutErr) } streamWait.Wait()