Skip to content

Commit

Permalink
Register file and Get executions functionality (flyteorg#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmahindrakar-oss authored and robert-ulbrich-mercedes-benz committed Jul 2, 2024
1 parent 2b7361e commit e8eb060
Show file tree
Hide file tree
Showing 16 changed files with 854 additions and 8 deletions.
15 changes: 10 additions & 5 deletions flytectl/cmd/core/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@ package cmdcore
import (
"context"
"fmt"
"github.com/spf13/pflag"

"github.com/lyft/flyteidl/clients/go/admin"
"github.com/spf13/cobra"

"github.com/lyft/flytectl/cmd/config"
)

type PFlagProvider interface {
GetPFlagSet(prefix string) *pflag.FlagSet
}

type CommandEntry struct {
ProjectDomainNotRequired bool
CmdFunc CommandFunc
Aliases []string
PFlagProvider PFlagProvider
}

func AddCommands(rootCmd *cobra.Command, cmdFuncs map[string]CommandEntry) {
Expand All @@ -24,7 +30,9 @@ func AddCommands(rootCmd *cobra.Command, cmdFuncs map[string]CommandEntry) {
Aliases: cmdEntry.Aliases,
RunE: generateCommandFunc(cmdEntry),
}

if cmdEntry.PFlagProvider != nil {
cmd.Flags().AddFlagSet(cmdEntry.PFlagProvider.GetPFlagSet(""))
}
rootCmd.AddCommand(cmd)
}
}
Expand All @@ -49,9 +57,6 @@ func generateCommandFunc(cmdEntry CommandEntry) func(cmd *cobra.Command, args []
if err != nil {
return err
}
return cmdEntry.CmdFunc(ctx, args, CommandContext{
out: cmd.OutOrStdout(),
adminClient: adminClient,
})
return cmdEntry.CmdFunc(ctx, args, NewCommandContext(adminClient, cmd.OutOrStdout()))
}
}
4 changes: 4 additions & 0 deletions flytectl/cmd/core/cmd_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type CommandContext struct {
out io.Writer
}

func NewCommandContext(adminClient service.AdminServiceClient, out io.Writer) CommandContext {
return CommandContext{adminClient: adminClient, out : out}
}

func (c CommandContext) AdminClient() service.AdminServiceClient {
return c.adminClient
}
Expand Down
68 changes: 68 additions & 0 deletions flytectl/cmd/get/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package get

import (
"context"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

"github.com/golang/protobuf/proto"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flytectl/cmd/config"
cmdCore "github.com/lyft/flytectl/cmd/core"
"github.com/lyft/flytectl/pkg/printer"
)

var executionColumns = []printer.Column{
{"Name", "$.id.name"},
{"Workflow Name", "$.closure.workflowId.name"},
{"Type", "$.closure.workflowId.resourceType"},
{"Phase", "$.closure.phase"},
{"Started", "$.closure.startedAt"},
{"Elapsed Time", "$.closure.duration"},
}

func ExecutionToProtoMessages(l []*admin.Execution) []proto.Message {
messages := make([]proto.Message, 0, len(l))
for _, m := range l {
messages = append(messages, m)
}
return messages
}

func getExecutionFunc(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error {
adminPrinter := printer.Printer{}
var executions []* admin.Execution
if len(args) > 0 {
name := args[0]
execution, err := cmdCtx.AdminClient().GetExecution(ctx, &admin.WorkflowExecutionGetRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
Name: name,
},
})
if err != nil {
return err
}
executions = append(executions, execution)
} else {
executionList, err := cmdCtx.AdminClient().ListExecutions(ctx, &admin.ResourceListRequest{
Limit: 100,
Id: &admin.NamedEntityIdentifier{
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
},
})
if err != nil {
return err
}
executions = executionList.Executions
}
logger.Infof(ctx, "Retrieved %v executions", len(executions))
err := adminPrinter.Print(config.GetConfig().MustOutputFormat(), executionColumns, ExecutionToProtoMessages(executions)...)
if err != nil {
return err
}
return nil
}
219 changes: 219 additions & 0 deletions flytectl/cmd/get/execution_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package get

import (
"context"
"errors"
"github.com/lyft/flytectl/cmd/config"
cmdCore "github.com/lyft/flytectl/cmd/core"
"github.com/lyft/flyteidl/clients/go/admin/mocks"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
"io"
"testing"
)

const projectValue = "dummyProject"
const domainValue = "dummyDomain"
const executionNameValue = "e124"
const launchPlanNameValue = "lp_name"
const launchPlanVersionValue = "lp_version"
const workflowNameValue = "wf_name"
const workflowVersionValue = "wf_version"

func TestListExecutionFunc(t *testing.T) {
ctx := context.Background()
config.GetConfig().Project = projectValue
config.GetConfig().Domain = domainValue
config.GetConfig().Output = "json"
var args []string
mockClient := new(mocks.AdminServiceClient)
mockOutStream := new(io.Writer)
cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream)
execListRequest := &admin.ResourceListRequest{
Limit: 100,
Id: &admin.NamedEntityIdentifier{
Project: projectValue,
Domain: domainValue,
},
}
executionResponse := &admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Project: projectValue,
Domain: domainValue,
Name: executionNameValue,
},
Spec: &admin.ExecutionSpec{
LaunchPlan: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: launchPlanNameValue,
Version: launchPlanVersionValue,
},
},
Closure: &admin.ExecutionClosure{
WorkflowId: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: workflowNameValue,
Version: workflowVersionValue,
},
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
var executions []*admin.Execution
executions = append(executions, executionResponse)
executionList := &admin.ExecutionList{
Executions: executions,
}
mockClient.OnListExecutionsMatch(ctx, execListRequest).Return(executionList, nil)
err := getExecutionFunc(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "ListExecutions", ctx, execListRequest)
}

func TestListExecutionFuncWithError(t *testing.T) {
ctx := context.Background()
config.GetConfig().Project = projectValue
config.GetConfig().Domain = domainValue
config.GetConfig().Output = "json"
var args []string
mockClient := new(mocks.AdminServiceClient)
mockOutStream := new(io.Writer)
cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream)
execListRequest := &admin.ResourceListRequest{
Limit: 100,
Id: &admin.NamedEntityIdentifier{
Project: projectValue,
Domain: domainValue,
},
}
executionResponse := &admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Project: projectValue,
Domain: domainValue,
Name: executionNameValue,
},
Spec: &admin.ExecutionSpec{
LaunchPlan: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: launchPlanNameValue,
Version: launchPlanVersionValue,
},
},
Closure: &admin.ExecutionClosure{
WorkflowId: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: workflowNameValue,
Version: workflowVersionValue,
},
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
var executions []*admin.Execution
executions = append(executions, executionResponse)
mockClient.OnListExecutionsMatch(ctx, execListRequest).Return(nil, errors.New("Executions NotFound."))
err := getExecutionFunc(ctx, args, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, err, errors.New("Executions NotFound."))
mockClient.AssertCalled(t, "ListExecutions", ctx, execListRequest)
}

func TestGetExecutionFunc(t *testing.T) {
ctx := context.Background()
config.GetConfig().Project = projectValue
config.GetConfig().Domain = domainValue
config.GetConfig().Output = "json"
mockClient := new(mocks.AdminServiceClient)
mockOutStream := new(io.Writer)
cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream)
execGetRequest := &admin.WorkflowExecutionGetRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: projectValue,
Domain: domainValue,
Name: executionNameValue,
},
}
executionResponse := &admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Project: projectValue,
Domain: domainValue,
Name: executionNameValue,
},
Spec: &admin.ExecutionSpec{
LaunchPlan: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: launchPlanNameValue,
Version: launchPlanVersionValue,
},
},
Closure: &admin.ExecutionClosure{
WorkflowId: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: workflowNameValue,
Version: workflowVersionValue,
},
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
var executions []*admin.Execution
executions = append(executions, executionResponse)
args := []string{executionNameValue}
mockClient.OnGetExecutionMatch(ctx, execGetRequest).Return(executionResponse, nil)
err := getExecutionFunc(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "GetExecution", ctx, execGetRequest)
}

func TestGetExecutionFuncWithError(t *testing.T) {
ctx := context.Background()
config.GetConfig().Project = projectValue
config.GetConfig().Domain = domainValue
config.GetConfig().Output = "json"
mockClient := new(mocks.AdminServiceClient)
mockOutStream := new(io.Writer)
cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream)
execGetRequest := &admin.WorkflowExecutionGetRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: projectValue,
Domain: domainValue,
Name: executionNameValue,
},
}
executionResponse := &admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Project: projectValue,
Domain: domainValue,
Name: executionNameValue,
},
Spec: &admin.ExecutionSpec{
LaunchPlan: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: launchPlanNameValue,
Version: launchPlanVersionValue,
},
},
Closure: &admin.ExecutionClosure{
WorkflowId: &core.Identifier{
Project: projectValue,
Domain: domainValue,
Name: workflowNameValue,
Version: workflowVersionValue,
},
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
var executions []*admin.Execution
executions = append(executions, executionResponse)
args := []string{executionNameValue}
mockClient.OnGetExecutionMatch(ctx, execGetRequest).Return(nil, errors.New("Execution NotFound."))
err := getExecutionFunc(ctx, args, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, err, errors.New("Execution NotFound."))
mockClient.AssertCalled(t, "GetExecution", ctx, execGetRequest)
}
1 change: 1 addition & 0 deletions flytectl/cmd/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func CreateGetCommand() *cobra.Command {
"task": {CmdFunc: getTaskFunc, Aliases: []string{"tasks"}},
"workflow": {CmdFunc: getWorkflowFunc, Aliases: []string{"workflows"}},
"launchplan": {CmdFunc: getLaunchPlanFunc, Aliases: []string{"launchplans"}},
"execution": {CmdFunc: getExecutionFunc, Aliases: []string{"executions"}},
}

cmdcore.AddCommands(getCmd, getResourcesFuncs)
Expand Down
36 changes: 36 additions & 0 deletions flytectl/cmd/get/get_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package get

import (
"fmt"
"github.com/stretchr/testify/assert"
"sort"
"testing"
)

func TestCreateGetCommand(t *testing.T) {
getCommand := CreateGetCommand()
assert.Equal(t, getCommand.Use , "get")
assert.Equal(t, getCommand.Short , "Retrieve various resource.")
fmt.Println(getCommand.Commands())
assert.Equal(t, len(getCommand.Commands()), 5)
cmdNouns := getCommand.Commands()
// Sort by Use value.
sort.Slice(cmdNouns, func(i, j int) bool {
return cmdNouns[i].Use < cmdNouns[j].Use
})
assert.Equal(t, cmdNouns[0].Use, "execution")
assert.Equal(t, cmdNouns[0].Aliases, []string{"executions"})
assert.Equal(t, cmdNouns[0].Short, "Retrieves execution resources.")
assert.Equal(t, cmdNouns[1].Use, "launchplan")
assert.Equal(t, cmdNouns[1].Aliases, []string{"launchplans"})
assert.Equal(t, cmdNouns[1].Short, "Retrieves launchplan resources.")
assert.Equal(t, cmdNouns[2].Use, "project")
assert.Equal(t, cmdNouns[2].Aliases, []string{"projects"})
assert.Equal(t, cmdNouns[2].Short, "Retrieves project resources.")
assert.Equal(t, cmdNouns[3].Use, "task")
assert.Equal(t, cmdNouns[3].Aliases, []string{"tasks"})
assert.Equal(t, cmdNouns[3].Short, "Retrieves task resources.")
assert.Equal(t, cmdNouns[4].Use, "workflow")
assert.Equal(t, cmdNouns[4].Aliases, []string{"workflows"})
assert.Equal(t, cmdNouns[4].Short, "Retrieves workflow resources.")
}
Loading

0 comments on commit e8eb060

Please sign in to comment.