Skip to content

Commit

Permalink
Fix databricks plugin (#4206)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Oct 17, 2023
1 parent a114769 commit 481a966
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR
return nil, nil, pluginErrors.Wrapf(pluginErrors.RuntimeFailure, err,
"Unable to fetch statementHandle from http response")
}
runID := fmt.Sprintf("%v", data["run_id"])
runID := fmt.Sprintf("%.0f", data["run_id"])

return ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token},
ResourceWrapper{StatusCode: resp.StatusCode}, nil
Expand All @@ -156,6 +156,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba
return nil, err
}
resp, err := p.client.Do(req)
logger.Debugf(ctx, "Get databricks job response", "resp", resp)
if err != nil {
logger.Errorf(ctx, "Failed to get databricks job status [%v]", resp)
return nil, err
Expand Down Expand Up @@ -221,7 +222,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
case http.StatusAccepted:
return core.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, taskInfo), nil
case http.StatusOK:
if lifeCycleState == "TERMINATED" {
if lifeCycleState == "TERMINATED" || lifeCycleState == "SKIPPED" || lifeCycleState == "INTERNAL_ERROR" {
if resultState == "SUCCESS" {
if err := writeOutput(ctx, taskCtx); err != nil {
pluginsCore.PhaseInfoFailure(string(rune(statusCode)), "failed to write output", taskInfo)
Expand Down

0 comments on commit 481a966

Please sign in to comment.