Skip to content

Commit

Permalink
refactoring watchDeploymentLogs functions
Browse files Browse the repository at this point in the history
  • Loading branch information
adilGhaffarDev committed Aug 10, 2022
1 parent e6abdc7 commit 8b86052
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 67 deletions.
2 changes: 1 addition & 1 deletion test/e2e/cluster_upgrade_runtimesdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func clusterUpgradeWithRuntimeSDKSpec(ctx context.Context, inputGetter func() cl
})

By("Watch Deployment logs of test extension")
framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{
framework.WatchDeploymentLogsByDeployment(ctx, framework.WatchDeploymentLogsByDeploymentInput{
GetLister: input.BootstrapClusterProxy.GetClient(),
ClientSet: input.BootstrapClusterProxy.GetClientSet(),
Deployment: &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "test-extension", Namespace: namespace.Name}},
Expand Down
4 changes: 2 additions & 2 deletions test/framework/clusterctl/clusterctl_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func InitManagementClusterAndWatchControllerLogs(ctx context.Context, input Init
}, intervals...)

// Start streaming logs from all controller providers
framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{
framework.WatchDeploymentLogsByDeployment(ctx, framework.WatchDeploymentLogsByDeploymentInput{
GetLister: client,
ClientSet: input.ClusterProxy.GetClientSet(),
Deployment: deployment,
Expand Down Expand Up @@ -159,7 +159,7 @@ func UpgradeManagementClusterAndWait(ctx context.Context, input UpgradeManagemen
}, intervals...)

// Start streaming logs from all controller providers
framework.WatchDeploymentLogs(ctx, framework.WatchDeploymentLogsInput{
framework.WatchDeploymentLogsByDeployment(ctx, framework.WatchDeploymentLogsByDeploymentInput{
GetLister: client,
ClientSet: input.ClusterProxy.GetClientSet(),
Deployment: deployment,
Expand Down
102 changes: 38 additions & 64 deletions test/framework/deployment_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,23 @@ type WatchDeploymentLogsBySelectorInput struct {
LogPath string
}

// WatchDeploymentLogsInput is the input for WatchDeploymentLogs.
type WatchDeploymentLogsInput struct {
// WatchDeploymentLogsByDeploymentInput is the input for WatchDeploymentLogsByDeployment.
type WatchDeploymentLogsByDeploymentInput struct {
GetLister GetLister
ClientSet *kubernetes.Clientset
Deployment *appsv1.Deployment
LogPath string
}

// watchDeploymentLogsInput is the input for watchDeploymentLogs.
type watchDeploymentLogsInput struct {
GetLister GetLister
ClientSet *kubernetes.Clientset
Deployment *appsv1.Deployment
Labels map[string]string
LogPath string
}

// logMetadata contains metadata about the logs.
// The format is very similar to the one used by promtail.
type logMetadata struct {
Expand Down Expand Up @@ -138,72 +147,22 @@ func WatchDeploymentLogsByLabelSelector(ctx context.Context, input WatchDeployme
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get deployment for labels")

for _, deployment := range deploymentList.Items {
pods := &corev1.PodList{}
Expect(input.GetLister.List(ctx, pods, client.InNamespace(deployment.Namespace), client.MatchingLabels(input.Labels))).To(Succeed(), "Failed to list Pods for deployment %s/%s", deployment.Namespace, deployment.Name)
for _, pod := range pods.Items {
for _, container := range deployment.Spec.Template.Spec.Containers {
log.Logf("Creating log watcher for controller %s/%s, pod %s, container %s", deployment.Namespace, deployment.Name, pod.Name, container.Name)

// Create log metadata file.
logMetadataFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+"-log-metadata.json"))
Expect(os.MkdirAll(filepath.Dir(logMetadataFile), 0750)).To(Succeed())

metadata := logMetadata{
Job: deployment.Namespace + "/" + deployment.Name,
Namespace: deployment.Namespace,
App: deployment.Name,
Pod: pod.Name,
Container: container.Name,
NodeName: pod.Spec.NodeName,
Stream: "stderr",
Labels: input.Labels,
}
metadataBytes, err := json.Marshal(&metadata)
Expect(err).To(BeNil())
Expect(os.WriteFile(logMetadataFile, metadataBytes, 0600)).To(Succeed())

// Watch each container's logs in a goroutine so we can stream them all concurrently.
go func(pod corev1.Pod, container corev1.Container) {
defer GinkgoRecover()

logFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+".log"))
Expect(os.MkdirAll(filepath.Dir(logFile), 0750)).To(Succeed())

f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
Expect(err).NotTo(HaveOccurred())
defer f.Close()

opts := &corev1.PodLogOptions{
Container: container.Name,
Follow: true,
}

podLogs, err := input.ClientSet.CoreV1().Pods(deployment.Namespace).GetLogs(pod.Name, opts).Stream(ctx)
if err != nil {
// Failing to stream logs should not cause the test to fail
log.Logf("Error starting logs stream for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err)
return
}
defer podLogs.Close()

out := bufio.NewWriter(f)
defer out.Flush()
_, err = out.ReadFrom(podLogs)
if err != nil && err != io.ErrUnexpectedEOF {
// Failing to stream logs should not cause the test to fail
log.Logf("Got error while streaming logs for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err)
}
}(pod, container)
}
}
deployment := deployment
watchDeploymentLogs(ctx, watchDeploymentLogsInput{
GetLister: input.GetLister,
ClientSet: input.ClientSet,
Deployment: &deployment,
Labels: deployment.Spec.Selector.MatchLabels,
LogPath: input.LogPath,
})
}
}

// WatchDeploymentLogs streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed
// WatchDeploymentLogsByDeployment streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed
// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors
// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself,
// that does not cause the test to fail.
func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {
func WatchDeploymentLogsByDeployment(ctx context.Context, input WatchDeploymentLogsByDeploymentInput) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WatchControllerLogs")
Expect(input.ClientSet).NotTo(BeNil(), "input.ClientSet is required for WatchControllerLogs")
Expect(input.Deployment).NotTo(BeNil(), "input.Deployment is required for WatchControllerLogs")
Expand All @@ -216,12 +175,26 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {

selector, err := metav1.LabelSelectorAsMap(deployment.Spec.Selector)
Expect(err).NotTo(HaveOccurred(), "Failed to Pods selector for deployment %s", klog.KObj(input.Deployment))
watchDeploymentLogs(ctx, watchDeploymentLogsInput{
GetLister: input.GetLister,
ClientSet: input.ClientSet,
Deployment: deployment,
Labels: selector,
LogPath: input.LogPath,
})

}

// watchDeploymentLogs streams logs for all containers for all pods belonging to a deployment with the given label. Each container's logs are streamed
// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors
// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself,
// that does not cause the test to fail.
func watchDeploymentLogs(ctx context.Context, input watchDeploymentLogsInput) {
pods := &corev1.PodList{}
Expect(input.GetLister.List(ctx, pods, client.InNamespace(input.Deployment.Namespace), client.MatchingLabels(selector))).To(Succeed(), "Failed to list Pods for deployment %s", klog.KObj(input.Deployment))
Expect(input.GetLister.List(ctx, pods, client.InNamespace(input.Deployment.Namespace), client.MatchingLabels(input.Labels))).To(Succeed(), "Failed to list Pods for deployment %s", klog.KObj(input.Deployment))

for _, pod := range pods.Items {
for _, container := range deployment.Spec.Template.Spec.Containers {
for _, container := range input.Deployment.Spec.Template.Spec.Containers {
log.Logf("Creating log watcher for controller %s, pod %s, container %s", klog.KObj(input.Deployment), pod.Name, container.Name)

// Create log metadata file.
Expand All @@ -236,6 +209,7 @@ func WatchDeploymentLogs(ctx context.Context, input WatchDeploymentLogsInput) {
Container: container.Name,
NodeName: pod.Spec.NodeName,
Stream: "stderr",
Labels: input.Labels,
}
metadataBytes, err := json.Marshal(&metadata)
Expect(err).To(BeNil())
Expand Down

0 comments on commit 8b86052

Please sign in to comment.