diff --git a/test/e2e/cluster_upgrade_runtimesdk.go b/test/e2e/cluster_upgrade_runtimesdk.go index 9c8747628624..600712830731 100644 --- a/test/e2e/cluster_upgrade_runtimesdk.go +++ b/test/e2e/cluster_upgrade_runtimesdk.go @@ -157,7 +157,7 @@ func clusterUpgradeWithRuntimeSDKSpec(ctx context.Context, inputGetter func() cl }) By("Watch Deployment logs of test extension") - framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{ + framework.WatchDeploymentLogsByDeployment(ctx, framework.WatchDeploymentLogsByDeploymentInput{ GetLister: input.BootstrapClusterProxy.GetClient(), ClientSet: input.BootstrapClusterProxy.GetClientSet(), Deployment: &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "test-extension", Namespace: namespace.Name}}, diff --git a/test/framework/clusterctl/clusterctl_helpers.go b/test/framework/clusterctl/clusterctl_helpers.go index 5f18ceb6e3fc..5fb0c5cf5c36 100644 --- a/test/framework/clusterctl/clusterctl_helpers.go +++ b/test/framework/clusterctl/clusterctl_helpers.go @@ -103,7 +103,7 @@ func InitManagementClusterAndWatchControllerLogs(ctx context.Context, input Init }, intervals...) // Start streaming logs from all controller providers - framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{ + framework.WatchDeploymentLogsByDeployment(ctx, framework.WatchDeploymentLogsByDeploymentInput{ GetLister: client, ClientSet: input.ClusterProxy.GetClientSet(), Deployment: deployment, @@ -159,7 +159,7 @@ func UpgradeManagementClusterAndWait(ctx context.Context, input UpgradeManagemen }, intervals...) // Start streaming logs from all controller providers - framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{ + framework.WatchDeploymentLogsByDeployment(ctx, framework.WatchDeploymentLogsByDeploymentInput{ GetLister: client, ClientSet: input.ClusterProxy.GetClientSet(), Deployment: deployment, diff --git a/test/framework/deployment_helpers.go b/test/framework/deployment_helpers.go index a247ec2aa10f..fc02148a7bfa 100644 --- a/test/framework/deployment_helpers.go +++ b/test/framework/deployment_helpers.go @@ -102,14 +102,23 @@ type WatchDeploymentLogsBySelectorInput struct { LogPath string } -// WatchDeploymentLogsInput is the input for WatchDeploymentLogs. -type WatchDeploymentLogsInput struct { +// WatchDeploymentLogsByDeploymentInput is the input for WatchDeploymentLogsByDeployment. +type WatchDeploymentLogsByDeploymentInput struct { GetLister GetLister ClientSet *kubernetes.Clientset Deployment *appsv1.Deployment LogPath string } +// watchDeploymentLogsInput is the input for watchDeploymentLogs. +type watchDeploymentLogsInput struct { + GetLister GetLister + ClientSet *kubernetes.Clientset + Deployment *appsv1.Deployment + Labels map[string]string + LogPath string +} + // logMetadata contains metadata about the logs. // The format is very similar to the one used by promtail. type logMetadata struct { @@ -138,72 +147,22 @@ func WatchDeploymentLogsByLabelSelector(ctx context.Context, input WatchDeployme }, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get deployment for labels") for _, deployment := range deploymentList.Items { - pods := &corev1.PodList{} - Expect(input.GetLister.List(ctx, pods, client.InNamespace(deployment.Namespace), client.MatchingLabels(input.Labels))).To(Succeed(), "Failed to list Pods for deployment %s/%s", deployment.Namespace, deployment.Name) - for _, pod := range pods.Items { - for _, container := range deployment.Spec.Template.Spec.Containers { - log.Logf("Creating log watcher for controller %s/%s, pod %s, container %s", deployment.Namespace, deployment.Name, pod.Name, container.Name) - - // Create log metadata file. - logMetadataFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+"-log-metadata.json")) - Expect(os.MkdirAll(filepath.Dir(logMetadataFile), 0750)).To(Succeed()) - - metadata := logMetadata{ - Job: deployment.Namespace + "/" + deployment.Name, - Namespace: deployment.Namespace, - App: deployment.Name, - Pod: pod.Name, - Container: container.Name, - NodeName: pod.Spec.NodeName, - Stream: "stderr", - Labels: input.Labels, - } - metadataBytes, err := json.Marshal(&metadata) - Expect(err).To(BeNil()) - Expect(os.WriteFile(logMetadataFile, metadataBytes, 0600)).To(Succeed()) - - // Watch each container's logs in a goroutine so we can stream them all concurrently. - go func(pod corev1.Pod, container corev1.Container) { - defer GinkgoRecover() - - logFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+".log")) - Expect(os.MkdirAll(filepath.Dir(logFile), 0750)).To(Succeed()) - - f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) - Expect(err).NotTo(HaveOccurred()) - defer f.Close() - - opts := &corev1.PodLogOptions{ - Container: container.Name, - Follow: true, - } - - podLogs, err := input.ClientSet.CoreV1().Pods(deployment.Namespace).GetLogs(pod.Name, opts).Stream(ctx) - if err != nil { - // Failing to stream logs should not cause the test to fail - log.Logf("Error starting logs stream for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err) - return - } - defer podLogs.Close() - - out := bufio.NewWriter(f) - defer out.Flush() - _, err = out.ReadFrom(podLogs) - if err != nil && err != io.ErrUnexpectedEOF { - // Failing to stream logs should not cause the test to fail - log.Logf("Got error while streaming logs for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err) - } - }(pod, container) - } - } + deployment := deployment + watchDeploymentLogs(ctx, watchDeploymentLogsInput{ + GetLister: input.GetLister, + ClientSet: input.ClientSet, + Deployment: &deployment, + Labels: deployment.Spec.Selector.MatchLabels, + LogPath: input.LogPath, + }) } } -// WatchDeploymentLogs streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed +// WatchDeploymentLogsByDeployment streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed // in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors // retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself, // that does not cause the test to fail. -func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) { +func WatchDeploymentLogsByDeployment(ctx context.Context, input WatchDeploymentLogsByDeploymentInput) { Expect(ctx).NotTo(BeNil(), "ctx is required for WatchControllerLogs") Expect(input.ClientSet).NotTo(BeNil(), "input.ClientSet is required for WatchControllerLogs") Expect(input.Deployment).NotTo(BeNil(), "input.Deployment is required for WatchControllerLogs") @@ -216,12 +175,26 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) { selector, err := metav1.LabelSelectorAsMap(deployment.Spec.Selector) Expect(err).NotTo(HaveOccurred(), "Failed to Pods selector for deployment %s", klog.KObj(input.Deployment)) + watchDeploymentLogs(ctx, watchDeploymentLogsInput{ + GetLister: input.GetLister, + ClientSet: input.ClientSet, + Deployment: deployment, + Labels: selector, + LogPath: input.LogPath, + }) + +} +// watchDeploymentLogs streams logs for all containers for all pods belonging to a deployment with the given label. Each container's logs are streamed +// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors +// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself, +// that does not cause the test to fail. +func watchDeploymentLogs(ctx context.Context, input watchDeploymentLogsInput) { pods := &corev1.PodList{} - Expect(input.GetLister.List(ctx, pods, client.InNamespace(input.Deployment.Namespace), client.MatchingLabels(selector))).To(Succeed(), "Failed to list Pods for deployment %s", klog.KObj(input.Deployment)) + Expect(input.GetLister.List(ctx, pods, client.InNamespace(input.Deployment.Namespace), client.MatchingLabels(input.Labels))).To(Succeed(), "Failed to list Pods for deployment %s", klog.KObj(input.Deployment)) for _, pod := range pods.Items { - for _, container := range deployment.Spec.Template.Spec.Containers { + for _, container := range input.Deployment.Spec.Template.Spec.Containers { log.Logf("Creating log watcher for controller %s, pod %s, container %s", klog.KObj(input.Deployment), pod.Name, container.Name) // Create log metadata file. @@ -236,6 +209,7 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) { Container: container.Name, NodeName: pod.Spec.NodeName, Stream: "stderr", + Labels: input.Labels, } metadataBytes, err := json.Marshal(&metadata) Expect(err).To(BeNil())