Skip to content

Commit

Permalink
Return ResourceWrapper without pointer (#4115)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Oct 5, 2023
1 parent 4ce7406 commit 81b94a4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
12 changes: 6 additions & 6 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR
taskTemplate.GetContainer().Args = argTemplate
}

return &ResourceMetaWrapper{
return ResourceMetaWrapper{
OutputPrefix: outputPrefix,
AgentResourceMeta: res.GetResourceMeta(),
Token: "",
TaskType: taskTemplate.Type,
}, &ResourceWrapper{State: admin.State_RUNNING}, nil
}, ResourceWrapper{State: admin.State_RUNNING}, nil
}

func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {
metadata := taskCtx.ResourceMeta().(*ResourceMetaWrapper)
metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper)

agent, err := getFinalAgent(metadata.TaskType, p.cfg)
if err != nil {
Expand All @@ -138,7 +138,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba
return nil, err
}

return &ResourceWrapper{
return ResourceWrapper{
State: res.Resource.State,
Outputs: res.Resource.Outputs,
}, nil
Expand Down Expand Up @@ -167,7 +167,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error
}

func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
resource := taskCtx.Resource().(*ResourceWrapper)
resource := taskCtx.Resource().(ResourceWrapper)
taskInfo := &core.TaskInfo{}

switch resource.State {
Expand All @@ -188,7 +188,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
return core.PhaseInfoUndefined, pluginErrors.Errorf(core.SystemErrorCode, "unknown execution phase [%v].", resource.State)
}

func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource *ResourceWrapper) error {
func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, resource ResourceWrapper) error {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR
}
runID := fmt.Sprintf("%v", data["run_id"])

return &ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token},
&ResourceWrapper{StatusCode: resp.StatusCode}, nil
return ResourceMetaWrapper{runID, p.cfg.DatabricksInstance, token},
ResourceWrapper{StatusCode: resp.StatusCode}, nil
}

func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {
exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper)
exec := taskCtx.ResourceMeta().(ResourceMetaWrapper)
req, err := buildRequest(get, nil, p.cfg.databricksEndpoint,
p.cfg.DatabricksInstance, exec.Token, exec.RunID, false)
if err != nil {
Expand All @@ -176,7 +176,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba
jobID := fmt.Sprintf("%.0f", data["job_id"])
lifeCycleState := fmt.Sprintf("%s", jobState["life_cycle_state"])
resultState := fmt.Sprintf("%s", jobState["result_state"])
return &ResourceWrapper{
return ResourceWrapper{
StatusCode: resp.StatusCode,
JobID: jobID,
LifeCycleState: lifeCycleState,
Expand Down Expand Up @@ -206,8 +206,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error
}

func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper)
resource := taskCtx.Resource().(*ResourceWrapper)
exec := taskCtx.ResourceMeta().(ResourceMetaWrapper)
resource := taskCtx.Resource().(ResourceWrapper)
message := resource.Message
statusCode := resource.StatusCode
jobID := resource.JobID
Expand Down
16 changes: 8 additions & 8 deletions flyteplugins/go/tasks/plugins/webapi/snowflake/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR
queryID := fmt.Sprintf("%v", data["statementHandle"])
message := fmt.Sprintf("%v", data["message"])

return &ResourceMetaWrapper{queryID, queryInfo.Account, token},
&ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil
return ResourceMetaWrapper{queryID, queryInfo.Account, token},
ResourceWrapper{StatusCode: resp.StatusCode, Message: message}, nil
}

func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {
exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper)
exec := taskCtx.ResourceMeta().(ResourceMetaWrapper)
req, err := buildRequest(get, QueryInfo{}, p.cfg.snowflakeEndpoint,
exec.Account, exec.Token, exec.QueryID, false)
if err != nil {
Expand All @@ -160,7 +160,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba
return nil, err
}
message := fmt.Sprintf("%v", data["message"])
return &ResourceWrapper{
return ResourceWrapper{
StatusCode: resp.StatusCode,
Message: message,
}, nil
Expand All @@ -170,7 +170,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error
if taskCtx.ResourceMeta() == nil {
return nil
}
exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper)
exec := taskCtx.ResourceMeta().(ResourceMetaWrapper)
req, err := buildRequest(post, QueryInfo{}, p.cfg.snowflakeEndpoint,
exec.Account, exec.Token, exec.QueryID, true)
if err != nil {
Expand All @@ -187,8 +187,8 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error
}

func (p Plugin) Status(_ context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
exec := taskCtx.ResourceMeta().(*ResourceMetaWrapper)
statusCode := taskCtx.Resource().(*ResourceWrapper).StatusCode
exec := taskCtx.ResourceMeta().(ResourceMetaWrapper)
statusCode := taskCtx.Resource().(ResourceWrapper).StatusCode
if statusCode == 0 {
return core.PhaseInfoUndefined, errors.Errorf(ErrSystem, "No Status field set.")
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func newSnowflakeJobTaskPlugin() webapi.PluginEntry {
ID: "snowflake",
SupportedTaskTypes: []core.TaskType{"snowflake"},
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
return &Plugin{
return Plugin{
metricScope: iCtx.MetricsScope(),
cfg: GetConfig(),
client: &http.Client{},
Expand Down

0 comments on commit 81b94a4

Please sign in to comment.