From 52a5668956b4b648996b617dada8588f0a86a599 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Oct 2023 15:25:30 -0700 Subject: [PATCH] Fix databricks plugin (#4206) --------- Signed-off-by: Kevin Su Signed-off-by: squiishyy --- flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index a0e5aa722e..1ce0d747f4 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -141,7 +141,7 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR return nil, nil, pluginErrors.Wrapf(pluginErrors.RuntimeFailure, err, "Unable to fetch statementHandle from http response") } - runID := fmt.Sprintf("%v", data["run_id"]) + runID := fmt.Sprintf("%.0f", data["run_id"]) return ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token}, ResourceWrapper{StatusCode: resp.StatusCode}, nil @@ -156,6 +156,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, err } resp, err := p.client.Do(req) + logger.Debugf(ctx, "Get databricks job response", "resp", resp) if err != nil { logger.Errorf(ctx, "Failed to get databricks job status [%v]", resp) return nil, err @@ -221,7 +222,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase case http.StatusAccepted: return core.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, taskInfo), nil case http.StatusOK: - if lifeCycleState == "TERMINATED" { + if lifeCycleState == "TERMINATED" || lifeCycleState == "SKIPPED" || lifeCycleState == "INTERNAL_ERROR" { if resultState == "SUCCESS" { if err := writeOutput(ctx, taskCtx); err != nil { pluginsCore.PhaseInfoFailure(string(rune(statusCode)), "failed to write output", taskInfo)