Skip to content

Commit

Permalink
fix(pkg/util/kubernetes/portforward.go): corner case scenario in port…
Browse files Browse the repository at this point in the history
… forwarding for debug cmd is now handled properly.
  • Loading branch information
valdar committed Feb 15, 2024
1 parent 240f1a4 commit 82e6c57
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
5 changes: 5 additions & 0 deletions e2e/commonwithcustominstall/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestKamelCLIDebug(t *testing.T) {

Eventually(portIsInUse("127.0.0.1", "5005"), TestTimeoutMedium, 5*time.Second).Should(BeTrue())
Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed())
Eventually(IntegrationPods(ns, "yaml"), TestTimeoutMedium, 5*time.Second).Should(HaveLen(0))
})

t.Run("debug local port check", func(t *testing.T) {
Expand All @@ -65,6 +66,7 @@ func TestKamelCLIDebug(t *testing.T) {

Eventually(portIsInUse("127.0.0.1", "5006"), TestTimeoutMedium, 5*time.Second).Should(BeTrue())
Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed())
Eventually(IntegrationPods(ns, "yaml"), TestTimeoutMedium, 5*time.Second).Should(HaveLen(0))
})

t.Run("debug logs check", func(t *testing.T) {
Expand All @@ -77,6 +79,7 @@ func TestKamelCLIDebug(t *testing.T) {

Eventually(IntegrationLogs(ns, "yaml"), TestTimeoutMedium).Should(ContainSubstring("Listening for transport dt_socket at address: 5005"))
Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed())
Eventually(IntegrationPods(ns, "yaml"), TestTimeoutMedium, 5*time.Second).Should(HaveLen(0))
})

t.Run("Pod config test", func(t *testing.T) {
Expand All @@ -92,9 +95,11 @@ func TestKamelCLIDebug(t *testing.T) {
}).Should(ContainSubstring("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005"))
Expect(IntegrationPod(ns, "yaml")().GetLabels()["camel.apache.org/debug"]).To(Not(BeNil()))
Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed())
Eventually(IntegrationPods(ns, "yaml"), TestTimeoutMedium, 5*time.Second).Should(HaveLen(0))
})

Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed())
Eventually(IntegrationPods(ns, "yaml"), TestTimeoutMedium, 5*time.Second).Should(HaveLen(0))
})
}

Expand Down
43 changes: 32 additions & 11 deletions pkg/util/kubernetes/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ import (

func PortForward(ctx context.Context, c client.Client, ns, labelSelector string, localPort, remotePort uint, stdOut, stdErr io.Writer) error {
log.InitForCmd()
list, err := c.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return err
}

var forwardPod *corev1.Pod
var forwardCtx context.Context
var forwardCtxCancel context.CancelFunc
Expand All @@ -52,19 +45,21 @@ func PortForward(ctx context.Context, c client.Client, ns, labelSelector string,
if forwardPod == nil && podReady(pod) {
forwardPod = pod
forwardCtx, forwardCtxCancel = context.WithCancel(ctx)
log.Debugf("Setting up Port Forward for pod with name: %q\n", forwardPod.Name)
if _, err := portFowardPod(forwardCtx, c.GetConfig(), ns, forwardPod.Name, localPort, remotePort, stdOut, stdErr); err != nil {
return err
}
}
return nil
}

if len(list.Items) > 0 {
if err := setupPortForward(&list.Items[0]); err != nil {
return err
}
log.Debugf("First attempt to bootstrap Port Forward with LabelSelector: %v\n", labelSelector)
list, err := bootstrapPortForward(ctx, c, ns, labelSelector, setupPortForward)
if err != nil {
return err
}

log.Debugf("Instantiating pod event watcher with LabelSelector: %v and ResourceVersion: %v in namespace: %v\n", labelSelector, list.ResourceVersion, ns)
watcher, err := c.CoreV1().Pods(ns).Watch(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
ResourceVersion: list.ResourceVersion,
Expand All @@ -90,6 +85,7 @@ func PortForward(ctx context.Context, c client.Client, ns, labelSelector string,
if !ok {
return fmt.Errorf("type assertion failed: %v", e.Object)
}
log.Debugf("Handling watch.Added event for pod with name: %v\n", pod.Name)
if err := setupPortForward(pod); err != nil {
return err
}
Expand All @@ -98,27 +94,52 @@ func PortForward(ctx context.Context, c client.Client, ns, labelSelector string,
if !ok {
return fmt.Errorf("type assertion failed: %v", e.Object)
}
log.Debugf("Handling watch.Modified event for pod with name: %v\n", pod.Name)
if err := setupPortForward(pod); err != nil {
return err
}
case watch.Deleted:
log.Debugf("Handling watch.Deleted event\n")
if forwardPod != nil && e.Object != nil {
deletedPod, ok := e.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("type assertion failed: %v", e.Object)
}
log.Debugf("Handling watch.Deleted event for pod with name: %v while Port Forward was active for pod with name: %v\n", deletedPod.Name, forwardPod.Name)
if deletedPod.Name == forwardPod.Name {
forwardCtxCancel()
forwardPod = nil
forwardCtx = nil
forwardCtxCancel = nil

log.Debugf("Handling watch.Deleted event, since the pod with Port Forward enabled has been deleted we try to bootstrap Port Forward with LabelSelector: %v\n", labelSelector)
_, err := bootstrapPortForward(ctx, c, ns, labelSelector, setupPortForward)
if err != nil {
return err
}
}
}
}
}
}
}

func bootstrapPortForward(ctx context.Context, c client.Client, ns string, labelSelector string, setupPortForward func(pod *corev1.Pod) error) (*corev1.PodList, error) {
list, err := c.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, err
}
if len(list.Items) > 0 {
log.Debugf("Bootstrapping Port Forward for pod with name: %v\n", list.Items[0].Name)
if err := setupPortForward(&list.Items[0]); err != nil {
return nil, err
}
}
return list, nil
}

func portFowardPod(ctx context.Context, config *restclient.Config, ns, pod string, localPort, remotePort uint, stdOut, stdErr io.Writer) (string, error) {
c, err := corev1client.NewForConfig(config)
if err != nil {
Expand Down

0 comments on commit 82e6c57

Please sign in to comment.