diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index acd5201701..68805de340 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -95,9 +95,11 @@ func NewRunTestWorkflowCmd() *cobra.Command { if outputPretty { ui.NL() if watchEnabled { + fmt.Println("uiWatch") exitCode = uiWatch(execution, client) ui.NL() if downloadArtifactsEnabled { + fmt.Println("downlaod artifact") tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty) } } else { @@ -130,7 +132,9 @@ func NewRunTestWorkflowCmd() *cobra.Command { } func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int { + fmt.Println("watch logs") result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client) + fmt.Println("watch logs done", err, result) ui.ExitOnError("reading test workflow execution logs", err) // Apply the result in the execution diff --git a/internal/app/api/v1/testworkflowexecutions.go b/internal/app/api/v1/testworkflowexecutions.go index feea1100ca..f7fc2fcaa8 100644 --- a/internal/app/api/v1/testworkflowexecutions.go +++ b/internal/app/api/v1/testworkflowexecutions.go @@ -456,20 +456,25 @@ func (s *TestkubeAPI) GetTestWorkflowNotificationsStream(ctx context.Context, ex // Check for the logs ctrl, err := testworkflowcontroller.New(ctx, s.Clientset, execution.GetNamespace(s.Namespace), execution.Id, execution.ScheduledAt) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create test workflow controller") } + fmt.Println("GetTestWorkflowNotificationsStream", executionID) // Stream the notifications ch := make(chan testkube.TestWorkflowExecutionNotification) go func() { for n := range ctrl.Watch(ctx) { if n.Error == nil { ch <- n.Value.ToInternal() + } else { + s.Log.Errorw("failed to watch logs", "error", n.Error) } } ctrl.StopController() close(ch) }() + fmt.Println("GetTestWorkflowNotificationsStream done", executionID) + return ch, nil } diff --git a/pkg/agent/testworkflows.go b/pkg/agent/testworkflows.go index 10f4202153..26bf564393 100644 --- a/pkg/agent/testworkflows.go +++ b/pkg/agent/testworkflows.go @@ -114,6 +114,7 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c } } if err != nil { + ag.logger.Errorf("error executing workflow notifications request: %s", err.Error()) message := fmt.Sprintf("cannot get pod logs: %s", err.Error()) ag.testWorkflowNotificationsResponseBuffer <- &cloud.TestWorkflowNotificationsResponse{ StreamId: req.StreamId, diff --git a/pkg/api/v1/client/direct_client.go b/pkg/api/v1/client/direct_client.go index a055099a9f..303dc4390a 100644 --- a/pkg/api/v1/client/direct_client.go +++ b/pkg/api/v1/client/direct_client.go @@ -219,6 +219,8 @@ func (t DirectClient[A]) GetLogsV2(uri string, logs chan events.Log) error { // GetTestWorkflowExecutionNotifications returns logs stream from job pods, based on job pods logs func (t DirectClient[A]) GetTestWorkflowExecutionNotifications(uri string, notifications chan testkube.TestWorkflowExecutionNotification) error { + + fmt.Println("get", uri) req, err := http.NewRequest(http.MethodGet, uri, nil) if err != nil { return err @@ -234,6 +236,7 @@ func (t DirectClient[A]) GetTestWorkflowExecutionNotifications(uri string, notif defer close(notifications) defer resp.Body.Close() + fmt.Println("StreamToTestWorkflowExecutionNotificationsChannel") StreamToTestWorkflowExecutionNotificationsChannel(resp.Body, notifications) }() diff --git a/pkg/api/v1/client/testworkflow.go b/pkg/api/v1/client/testworkflow.go index 7e3420eb07..dd7bb394cc 100644 --- a/pkg/api/v1/client/testworkflow.go +++ b/pkg/api/v1/client/testworkflow.go @@ -125,6 +125,7 @@ func (c TestWorkflowClient) ExecuteTestWorkflow(name string, request testkube.Te func (c TestWorkflowClient) GetTestWorkflowExecutionNotifications(id string) (notifications chan testkube.TestWorkflowExecutionNotification, err error) { notifications = make(chan testkube.TestWorkflowExecutionNotification) uri := c.testWorkflowTransport.GetURI("/test-workflow-executions/%s/notifications", id) + fmt.Println("uri", uri) err = c.testWorkflowTransport.GetTestWorkflowExecutionNotifications(uri, notifications) return notifications, err } diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go index a3e0ace239..823c42240e 100644 --- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go @@ -74,6 +74,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Stop immediately after the operation is canceled if ctx.Err() != nil { + fmt.Println("Context error 1", ctx.Err(), context.Cause(ctx)) return } @@ -148,6 +149,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Stop immediately after the operation is canceled if ctx.Err() != nil { + fmt.Println("Context error 2", ctx.Err(), context.Cause(ctx)) return } @@ -185,6 +187,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Stop immediately after the operation is canceled if ctx.Err() != nil { + fmt.Println("Context error 3", ctx.Err(), context.Cause(ctx)) return } @@ -198,6 +201,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Stop immediately after the operation is canceled if ctx.Err() != nil { + fmt.Println("Context error 4", ctx.Err(), context.Cause(ctx)) return } @@ -240,6 +244,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // Stop immediately after the operation is canceled if ctx.Err() != nil { + fmt.Println("Context error 5", ctx.Err(), context.Cause(ctx)) return }