diff --git a/test/framework/deployment_helpers.go b/test/framework/deployment_helpers.go index 213dbeccbe6e..a247ec2aa10f 100644 --- a/test/framework/deployment_helpers.go +++ b/test/framework/deployment_helpers.go @@ -94,6 +94,14 @@ func DescribeFailedDeployment(input WaitForDeploymentsAvailableInput, deployment return b.String() } +// WatchDeploymentLogsBySelectorInput is the input for WatchDeploymentLogsByLabelSelector. +type WatchDeploymentLogsBySelectorInput struct { + GetLister GetLister + ClientSet *kubernetes.Clientset + Labels map[string]string + LogPath string +} + // WatchDeploymentLogsInput is the input for WatchDeploymentLogs. type WatchDeploymentLogsInput struct { GetLister GetLister @@ -105,13 +113,90 @@ type WatchDeploymentLogsInput struct { // logMetadata contains metadata about the logs. // The format is very similar to the one used by promtail. type logMetadata struct { - Job string `json:"job"` - Namespace string `json:"namespace"` - App string `json:"app"` - Pod string `json:"pod"` - Container string `json:"container"` - NodeName string `json:"node_name"` - Stream string `json:"stream"` + Job string `json:"job"` + Namespace string `json:"namespace"` + App string `json:"app"` + Pod string `json:"pod"` + Container string `json:"container"` + NodeName string `json:"node_name"` + Stream string `json:"stream"` + Labels map[string]string `json:"labels,omitempty"` +} + +// WatchDeploymentLogsByLabelSelector streams logs for all containers for all pods belonging to a deployment on the basis of label. Each container's logs are streamed +// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors +// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself, +// that does not cause the test to fail. +func WatchDeploymentLogsByLabelSelector(ctx context.Context, input WatchDeploymentLogsBySelectorInput) { + Expect(ctx).NotTo(BeNil(), "ctx is required for WatchControllerLogs") + Expect(input.ClientSet).NotTo(BeNil(), "input.ClientSet is required for WatchControllerLogs") + Expect(input.Labels).NotTo(BeNil(), "input.Selector is required for WatchControllerLogs") + + deploymentList := &appsv1.DeploymentList{} + Eventually(func() error { + return input.GetLister.List(ctx, deploymentList, client.MatchingLabels(input.Labels)) + }, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get deployment for labels") + + for _, deployment := range deploymentList.Items { + pods := &corev1.PodList{} + Expect(input.GetLister.List(ctx, pods, client.InNamespace(deployment.Namespace), client.MatchingLabels(input.Labels))).To(Succeed(), "Failed to list Pods for deployment %s/%s", deployment.Namespace, deployment.Name) + for _, pod := range pods.Items { + for _, container := range deployment.Spec.Template.Spec.Containers { + log.Logf("Creating log watcher for controller %s/%s, pod %s, container %s", deployment.Namespace, deployment.Name, pod.Name, container.Name) + + // Create log metadata file. + logMetadataFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+"-log-metadata.json")) + Expect(os.MkdirAll(filepath.Dir(logMetadataFile), 0750)).To(Succeed()) + + metadata := logMetadata{ + Job: deployment.Namespace + "/" + deployment.Name, + Namespace: deployment.Namespace, + App: deployment.Name, + Pod: pod.Name, + Container: container.Name, + NodeName: pod.Spec.NodeName, + Stream: "stderr", + Labels: input.Labels, + } + metadataBytes, err := json.Marshal(&metadata) + Expect(err).To(BeNil()) + Expect(os.WriteFile(logMetadataFile, metadataBytes, 0600)).To(Succeed()) + + // Watch each container's logs in a goroutine so we can stream them all concurrently. + go func(pod corev1.Pod, container corev1.Container) { + defer GinkgoRecover() + + logFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+".log")) + Expect(os.MkdirAll(filepath.Dir(logFile), 0750)).To(Succeed()) + + f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + Expect(err).NotTo(HaveOccurred()) + defer f.Close() + + opts := &corev1.PodLogOptions{ + Container: container.Name, + Follow: true, + } + + podLogs, err := input.ClientSet.CoreV1().Pods(deployment.Namespace).GetLogs(pod.Name, opts).Stream(ctx) + if err != nil { + // Failing to stream logs should not cause the test to fail + log.Logf("Error starting logs stream for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err) + return + } + defer podLogs.Close() + + out := bufio.NewWriter(f) + defer out.Flush() + _, err = out.ReadFrom(podLogs) + if err != nil && err != io.ErrUnexpectedEOF { + // Failing to stream logs should not cause the test to fail + log.Logf("Got error while streaming logs for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err) + } + }(pod, container) + } + } + } } // WatchDeploymentLogs streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed