diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 958cd88ff6..a9b233a3bb 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -110,16 +110,16 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR taskTemplate.GetContainer().Args = argTemplate } - return &ResourceMetaWrapper{ + return ResourceMetaWrapper{ OutputPrefix: outputPrefix, AgentResourceMeta: res.GetResourceMeta(), Token: "", TaskType: taskTemplate.Type, - }, &ResourceWrapper{State: admin.State_RUNNING}, nil + }, ResourceWrapper{State: admin.State_RUNNING}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - metadata := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper) agent, err := getFinalAgent(metadata.TaskType, p.cfg) if err != nil { @@ -138,7 +138,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, err } - return &ResourceWrapper{ + return ResourceWrapper{ State: res.Resource.State, Outputs: res.Resource.Outputs, }, nil @@ -167,7 +167,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - resource := taskCtx.Resource().(*ResourceWrapper) + resource := taskCtx.Resource().(ResourceWrapper) taskInfo := &core.TaskInfo{} switch resource.State { @@ -188,7 +188,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase return core.PhaseInfoUndefined, pluginErrors.Errorf(core.SystemErrorCode, "unknown execution phase [%v].", resource.State) } -func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource *ResourceWrapper) error { +func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource ResourceWrapper) error { taskTemplate, err := taskCtx.TaskReader().Read(ctx) if err != nil { return err diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index 510c551eeb..238932d638 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -146,12 +146,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR } runID := fmt.Sprintf("%v", data["run_id"]) - return &ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token}, - &ResourceWrapper{StatusCode: resp.StatusCode}, nil + return ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token}, + ResourceWrapper{StatusCode: resp.StatusCode}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(get, nil, p.cfg.databricksEndpoint, p.cfg.DatabricksInstance, exec.Token, exec.RunID, false) if err != nil { @@ -176,7 +176,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba jobID := fmt.Sprintf("%.0f", data["job_id"]) lifeCycleState := fmt.Sprintf("%s", jobState["life_cycle_state"]) resultState := fmt.Sprintf("%s", jobState["result_state"]) - return &ResourceWrapper{ + return ResourceWrapper{ StatusCode: resp.StatusCode, JobID: jobID, LifeCycleState: lifeCycleState, @@ -206,8 +206,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) - resource := taskCtx.Resource().(*ResourceWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) + resource := taskCtx.Resource().(ResourceWrapper) message := resource.Message statusCode := resource.StatusCode jobID := resource.JobID diff --git a/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go b/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go index 7f2609f9cf..037b70d390 100644 --- a/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go @@ -139,12 +139,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR queryID := fmt.Sprintf("%v", data["statementHandle"]) message := fmt.Sprintf("%v", data["message"]) - return &ResourceMetaWrapper{queryID, queryInfo.Account, token}, - &ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil + return ResourceMetaWrapper{queryID, queryInfo.Account, token}, + ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(get, QueryInfo{}, p.cfg.snowflakeEndpoint, exec.Account, exec.Token, exec.QueryID, false) if err != nil { @@ -160,7 +160,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, err } message := fmt.Sprintf("%v", data["message"]) - return &ResourceWrapper{ + return ResourceWrapper{ StatusCode: resp.StatusCode, Message: message, }, nil @@ -170,7 +170,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error if taskCtx.ResourceMeta() == nil { return nil } - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) req, err := buildRequest(post, QueryInfo{}, p.cfg.snowflakeEndpoint, exec.Account, exec.Token, exec.QueryID, true) if err != nil { @@ -187,8 +187,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error } func (p Plugin) Status(_ context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { - exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper) - statusCode := taskCtx.Resource().(*ResourceWrapper).StatusCode + exec := taskCtx.ResourceMeta().(ResourceMetaWrapper) + statusCode := taskCtx.Resource().(ResourceWrapper).StatusCode if statusCode == 0 { return core.PhaseInfoUndefined, errors.Errorf(ErrSystem, "No Status field set.") } @@ -276,7 +276,7 @@ func newSnowflakeJobTaskPlugin() webapi.PluginEntry { ID: "snowflake", SupportedTaskTypes: []core.TaskType{"snowflake"}, PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { - return &Plugin{ + return Plugin{ metricScope: iCtx.MetricsScope(), cfg: GetConfig(), client: &http.Client{},