From 0b3e01a00d41c06c53a5ee63dc829ae83205877c Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss <77798312+pmahindrakar-oss@users.noreply.github.com> Date: Sat, 6 Feb 2021 23:06:23 +0530 Subject: [PATCH] Register file and Get executions functionality (#22) --- flytectl/cmd/core/cmd.go | 15 +- flytectl/cmd/core/cmd_ctx.go | 4 + flytectl/cmd/get/execution.go | 68 ++++++ flytectl/cmd/get/execution_test.go | 219 ++++++++++++++++++ flytectl/cmd/get/get.go | 1 + flytectl/cmd/get/get_test.go | 36 +++ flytectl/cmd/register/files.go | 61 +++++ flytectl/cmd/register/register.go | 23 ++ flytectl/cmd/register/register_test.go | 24 ++ flytectl/cmd/register/register_util.go | 206 ++++++++++++++++ .../cmd/register/registerfilesconfig_flags.go | 46 ++++ .../registerfilesconfig_flags_test.go | 146 ++++++++++++ flytectl/cmd/root.go | 2 + flytectl/go.mod | 2 +- flytectl/go.sum | 2 + flytectl/pkg/printer/printer.go | 7 +- 16 files changed, 854 insertions(+), 8 deletions(-) create mode 100644 flytectl/cmd/get/execution.go create mode 100644 flytectl/cmd/get/execution_test.go create mode 100644 flytectl/cmd/get/get_test.go create mode 100644 flytectl/cmd/register/files.go create mode 100644 flytectl/cmd/register/register.go create mode 100644 flytectl/cmd/register/register_test.go create mode 100644 flytectl/cmd/register/register_util.go create mode 100755 flytectl/cmd/register/registerfilesconfig_flags.go create mode 100755 flytectl/cmd/register/registerfilesconfig_flags_test.go diff --git a/flytectl/cmd/core/cmd.go b/flytectl/cmd/core/cmd.go index 07d57bb14c8..8e2463aad22 100644 --- a/flytectl/cmd/core/cmd.go +++ b/flytectl/cmd/core/cmd.go @@ -3,6 +3,7 @@ package cmdcore import ( "context" "fmt" + "github.com/spf13/pflag" "github.com/lyft/flyteidl/clients/go/admin" "github.com/spf13/cobra" @@ -10,10 +11,15 @@ import ( "github.com/lyft/flytectl/cmd/config" ) +type PFlagProvider interface { + GetPFlagSet(prefix string) *pflag.FlagSet +} + type CommandEntry struct { ProjectDomainNotRequired bool CmdFunc CommandFunc Aliases []string + PFlagProvider PFlagProvider } func AddCommands(rootCmd *cobra.Command, cmdFuncs map[string]CommandEntry) { @@ -24,7 +30,9 @@ func AddCommands(rootCmd *cobra.Command, cmdFuncs map[string]CommandEntry) { Aliases: cmdEntry.Aliases, RunE: generateCommandFunc(cmdEntry), } - + if cmdEntry.PFlagProvider != nil { + cmd.Flags().AddFlagSet(cmdEntry.PFlagProvider.GetPFlagSet("")) + } rootCmd.AddCommand(cmd) } } @@ -49,9 +57,6 @@ func generateCommandFunc(cmdEntry CommandEntry) func(cmd *cobra.Command, args [] if err != nil { return err } - return cmdEntry.CmdFunc(ctx, args, CommandContext{ - out: cmd.OutOrStdout(), - adminClient: adminClient, - }) + return cmdEntry.CmdFunc(ctx, args, NewCommandContext(adminClient, cmd.OutOrStdout())) } } diff --git a/flytectl/cmd/core/cmd_ctx.go b/flytectl/cmd/core/cmd_ctx.go index c39a0d902ac..bf593b2b4c6 100644 --- a/flytectl/cmd/core/cmd_ctx.go +++ b/flytectl/cmd/core/cmd_ctx.go @@ -12,6 +12,10 @@ type CommandContext struct { out io.Writer } +func NewCommandContext(adminClient service.AdminServiceClient, out io.Writer) CommandContext { + return CommandContext{adminClient: adminClient, out : out} +} + func (c CommandContext) AdminClient() service.AdminServiceClient { return c.adminClient } diff --git a/flytectl/cmd/get/execution.go b/flytectl/cmd/get/execution.go new file mode 100644 index 00000000000..b0a51deade1 --- /dev/null +++ b/flytectl/cmd/get/execution.go @@ -0,0 +1,68 @@ +package get + +import ( + "context" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + + "github.com/golang/protobuf/proto" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flytestdlib/logger" + + "github.com/lyft/flytectl/cmd/config" + cmdCore "github.com/lyft/flytectl/cmd/core" + "github.com/lyft/flytectl/pkg/printer" +) + +var executionColumns = []printer.Column{ + {"Name", "$.id.name"}, + {"Workflow Name", "$.closure.workflowId.name"}, + {"Type", "$.closure.workflowId.resourceType"}, + {"Phase", "$.closure.phase"}, + {"Started", "$.closure.startedAt"}, + {"Elapsed Time", "$.closure.duration"}, +} + +func ExecutionToProtoMessages(l []*admin.Execution) []proto.Message { + messages := make([]proto.Message, 0, len(l)) + for _, m := range l { + messages = append(messages, m) + } + return messages +} + +func getExecutionFunc(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { + adminPrinter := printer.Printer{} + var executions []* admin.Execution + if len(args) > 0 { + name := args[0] + execution, err := cmdCtx.AdminClient().GetExecution(ctx, &admin.WorkflowExecutionGetRequest{ + Id: &core.WorkflowExecutionIdentifier{ + Project: config.GetConfig().Project, + Domain: config.GetConfig().Domain, + Name: name, + }, + }) + if err != nil { + return err + } + executions = append(executions, execution) + } else { + executionList, err := cmdCtx.AdminClient().ListExecutions(ctx, &admin.ResourceListRequest{ + Limit: 100, + Id: &admin.NamedEntityIdentifier{ + Project: config.GetConfig().Project, + Domain: config.GetConfig().Domain, + }, + }) + if err != nil { + return err + } + executions = executionList.Executions + } + logger.Infof(ctx, "Retrieved %v executions", len(executions)) + err := adminPrinter.Print(config.GetConfig().MustOutputFormat(), executionColumns, ExecutionToProtoMessages(executions)...) + if err != nil { + return err + } + return nil +} diff --git a/flytectl/cmd/get/execution_test.go b/flytectl/cmd/get/execution_test.go new file mode 100644 index 00000000000..b7a83cbef69 --- /dev/null +++ b/flytectl/cmd/get/execution_test.go @@ -0,0 +1,219 @@ +package get + +import ( + "context" + "errors" + "github.com/lyft/flytectl/cmd/config" + cmdCore "github.com/lyft/flytectl/cmd/core" + "github.com/lyft/flyteidl/clients/go/admin/mocks" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/stretchr/testify/assert" + "io" + "testing" +) + +const projectValue = "dummyProject" +const domainValue = "dummyDomain" +const executionNameValue = "e124" +const launchPlanNameValue = "lp_name" +const launchPlanVersionValue = "lp_version" +const workflowNameValue = "wf_name" +const workflowVersionValue = "wf_version" + +func TestListExecutionFunc(t *testing.T) { + ctx := context.Background() + config.GetConfig().Project = projectValue + config.GetConfig().Domain = domainValue + config.GetConfig().Output = "json" + var args []string + mockClient := new(mocks.AdminServiceClient) + mockOutStream := new(io.Writer) + cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream) + execListRequest := &admin.ResourceListRequest{ + Limit: 100, + Id: &admin.NamedEntityIdentifier{ + Project: projectValue, + Domain: domainValue, + }, + } + executionResponse := &admin.Execution{ + Id: &core.WorkflowExecutionIdentifier{ + Project: projectValue, + Domain: domainValue, + Name: executionNameValue, + }, + Spec: &admin.ExecutionSpec{ + LaunchPlan: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: launchPlanNameValue, + Version: launchPlanVersionValue, + }, + }, + Closure: &admin.ExecutionClosure{ + WorkflowId: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: workflowNameValue, + Version: workflowVersionValue, + }, + Phase: core.WorkflowExecution_SUCCEEDED, + }, + } + var executions []*admin.Execution + executions = append(executions, executionResponse) + executionList := &admin.ExecutionList{ + Executions: executions, + } + mockClient.OnListExecutionsMatch(ctx, execListRequest).Return(executionList, nil) + err := getExecutionFunc(ctx, args, cmdCtx) + assert.Nil(t, err) + mockClient.AssertCalled(t, "ListExecutions", ctx, execListRequest) +} + +func TestListExecutionFuncWithError(t *testing.T) { + ctx := context.Background() + config.GetConfig().Project = projectValue + config.GetConfig().Domain = domainValue + config.GetConfig().Output = "json" + var args []string + mockClient := new(mocks.AdminServiceClient) + mockOutStream := new(io.Writer) + cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream) + execListRequest := &admin.ResourceListRequest{ + Limit: 100, + Id: &admin.NamedEntityIdentifier{ + Project: projectValue, + Domain: domainValue, + }, + } + executionResponse := &admin.Execution{ + Id: &core.WorkflowExecutionIdentifier{ + Project: projectValue, + Domain: domainValue, + Name: executionNameValue, + }, + Spec: &admin.ExecutionSpec{ + LaunchPlan: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: launchPlanNameValue, + Version: launchPlanVersionValue, + }, + }, + Closure: &admin.ExecutionClosure{ + WorkflowId: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: workflowNameValue, + Version: workflowVersionValue, + }, + Phase: core.WorkflowExecution_SUCCEEDED, + }, + } + var executions []*admin.Execution + executions = append(executions, executionResponse) + mockClient.OnListExecutionsMatch(ctx, execListRequest).Return(nil, errors.New("Executions NotFound.")) + err := getExecutionFunc(ctx, args, cmdCtx) + assert.NotNil(t, err) + assert.Equal(t, err, errors.New("Executions NotFound.")) + mockClient.AssertCalled(t, "ListExecutions", ctx, execListRequest) +} + +func TestGetExecutionFunc(t *testing.T) { + ctx := context.Background() + config.GetConfig().Project = projectValue + config.GetConfig().Domain = domainValue + config.GetConfig().Output = "json" + mockClient := new(mocks.AdminServiceClient) + mockOutStream := new(io.Writer) + cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream) + execGetRequest := &admin.WorkflowExecutionGetRequest{ + Id: &core.WorkflowExecutionIdentifier{ + Project: projectValue, + Domain: domainValue, + Name: executionNameValue, + }, + } + executionResponse := &admin.Execution{ + Id: &core.WorkflowExecutionIdentifier{ + Project: projectValue, + Domain: domainValue, + Name: executionNameValue, + }, + Spec: &admin.ExecutionSpec{ + LaunchPlan: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: launchPlanNameValue, + Version: launchPlanVersionValue, + }, + }, + Closure: &admin.ExecutionClosure{ + WorkflowId: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: workflowNameValue, + Version: workflowVersionValue, + }, + Phase: core.WorkflowExecution_SUCCEEDED, + }, + } + var executions []*admin.Execution + executions = append(executions, executionResponse) + args := []string{executionNameValue} + mockClient.OnGetExecutionMatch(ctx, execGetRequest).Return(executionResponse, nil) + err := getExecutionFunc(ctx, args, cmdCtx) + assert.Nil(t, err) + mockClient.AssertCalled(t, "GetExecution", ctx, execGetRequest) +} + +func TestGetExecutionFuncWithError(t *testing.T) { + ctx := context.Background() + config.GetConfig().Project = projectValue + config.GetConfig().Domain = domainValue + config.GetConfig().Output = "json" + mockClient := new(mocks.AdminServiceClient) + mockOutStream := new(io.Writer) + cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream) + execGetRequest := &admin.WorkflowExecutionGetRequest{ + Id: &core.WorkflowExecutionIdentifier{ + Project: projectValue, + Domain: domainValue, + Name: executionNameValue, + }, + } + executionResponse := &admin.Execution{ + Id: &core.WorkflowExecutionIdentifier{ + Project: projectValue, + Domain: domainValue, + Name: executionNameValue, + }, + Spec: &admin.ExecutionSpec{ + LaunchPlan: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: launchPlanNameValue, + Version: launchPlanVersionValue, + }, + }, + Closure: &admin.ExecutionClosure{ + WorkflowId: &core.Identifier{ + Project: projectValue, + Domain: domainValue, + Name: workflowNameValue, + Version: workflowVersionValue, + }, + Phase: core.WorkflowExecution_SUCCEEDED, + }, + } + var executions []*admin.Execution + executions = append(executions, executionResponse) + args := []string{executionNameValue} + mockClient.OnGetExecutionMatch(ctx, execGetRequest).Return(nil, errors.New("Execution NotFound.")) + err := getExecutionFunc(ctx, args, cmdCtx) + assert.NotNil(t, err) + assert.Equal(t, err, errors.New("Execution NotFound.")) + mockClient.AssertCalled(t, "GetExecution", ctx, execGetRequest) +} diff --git a/flytectl/cmd/get/get.go b/flytectl/cmd/get/get.go index 097206c2ffe..ed86e5d0d01 100644 --- a/flytectl/cmd/get/get.go +++ b/flytectl/cmd/get/get.go @@ -18,6 +18,7 @@ func CreateGetCommand() *cobra.Command { "task": {CmdFunc: getTaskFunc, Aliases: []string{"tasks"}}, "workflow": {CmdFunc: getWorkflowFunc, Aliases: []string{"workflows"}}, "launchplan": {CmdFunc: getLaunchPlanFunc, Aliases: []string{"launchplans"}}, + "execution": {CmdFunc: getExecutionFunc, Aliases: []string{"executions"}}, } cmdcore.AddCommands(getCmd, getResourcesFuncs) diff --git a/flytectl/cmd/get/get_test.go b/flytectl/cmd/get/get_test.go new file mode 100644 index 00000000000..8602d161c74 --- /dev/null +++ b/flytectl/cmd/get/get_test.go @@ -0,0 +1,36 @@ +package get + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "sort" + "testing" +) + +func TestCreateGetCommand(t *testing.T) { + getCommand := CreateGetCommand() + assert.Equal(t, getCommand.Use , "get") + assert.Equal(t, getCommand.Short , "Retrieve various resource.") + fmt.Println(getCommand.Commands()) + assert.Equal(t, len(getCommand.Commands()), 5) + cmdNouns := getCommand.Commands() + // Sort by Use value. + sort.Slice(cmdNouns, func(i, j int) bool { + return cmdNouns[i].Use < cmdNouns[j].Use + }) + assert.Equal(t, cmdNouns[0].Use, "execution") + assert.Equal(t, cmdNouns[0].Aliases, []string{"executions"}) + assert.Equal(t, cmdNouns[0].Short, "Retrieves execution resources.") + assert.Equal(t, cmdNouns[1].Use, "launchplan") + assert.Equal(t, cmdNouns[1].Aliases, []string{"launchplans"}) + assert.Equal(t, cmdNouns[1].Short, "Retrieves launchplan resources.") + assert.Equal(t, cmdNouns[2].Use, "project") + assert.Equal(t, cmdNouns[2].Aliases, []string{"projects"}) + assert.Equal(t, cmdNouns[2].Short, "Retrieves project resources.") + assert.Equal(t, cmdNouns[3].Use, "task") + assert.Equal(t, cmdNouns[3].Aliases, []string{"tasks"}) + assert.Equal(t, cmdNouns[3].Short, "Retrieves task resources.") + assert.Equal(t, cmdNouns[4].Use, "workflow") + assert.Equal(t, cmdNouns[4].Aliases, []string{"workflows"}) + assert.Equal(t, cmdNouns[4].Short, "Retrieves workflow resources.") +} diff --git a/flytectl/cmd/register/files.go b/flytectl/cmd/register/files.go new file mode 100644 index 00000000000..4787a69b27f --- /dev/null +++ b/flytectl/cmd/register/files.go @@ -0,0 +1,61 @@ +package register + +import ( + "context" + "encoding/json" + "fmt" + cmdCore "github.com/lyft/flytectl/cmd/core" + "github.com/lyft/flytectl/pkg/printer" + "github.com/lyft/flytestdlib/logger" + "io/ioutil" + "sort" +) + +func registerFromFilesFunc(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { + files := args + sort.Strings(files) + logger.Infof(ctx, "Parsing files... Total(%v)", len(files)) + logger.Infof(ctx, "Params version %v", filesConfig.version) + var registerResults [] RegisterResult + adminPrinter := printer.Printer{} + fastFail := !filesConfig.skipOnError + logger.Infof(ctx, "Fail fast %v", fastFail) + var _err error + for i := 0; i< len(files) && !(fastFail && _err != nil) ; i++ { + absFilePath := files[i] + var registerResult RegisterResult + logger.Infof(ctx, "Parsing %v", absFilePath) + fileContents, err := ioutil.ReadFile(absFilePath) + if err != nil { + registerResult = RegisterResult{Name: absFilePath, Status: "Failed", Info: fmt.Sprintf("Error reading file due to %v", err)} + registerResults = append(registerResults, registerResult) + _err = err + continue + } + spec, err := unMarshalContents(ctx, fileContents, absFilePath) + if err != nil { + registerResult = RegisterResult{Name: absFilePath, Status: "Failed", Info: fmt.Sprintf("Error unmarshalling file due to %v", err)} + registerResults = append(registerResults, registerResult) + _err = err + continue + } + if err := hydrateSpec(spec); err != nil { + registerResult = RegisterResult{Name: absFilePath, Status: "Failed", Info: fmt.Sprintf("Error hydrating spec due to %v", err)} + registerResults = append(registerResults, registerResult) + _err = err + continue + } + logger.Debugf(ctx, "Hydrated spec : %v", getJsonSpec(spec)) + if err := register(ctx, spec, cmdCtx); err != nil { + registerResult = RegisterResult{Name: absFilePath, Status: "Failed", Info: fmt.Sprintf("Error registering file due to %v", err)} + registerResults = append(registerResults, registerResult) + _err = err + continue + } + registerResult = RegisterResult{Name: absFilePath, Status: "Success", Info: "Successfully registered file"} + registerResults = append(registerResults, registerResult) + } + payload, _ := json.Marshal(registerResults) + adminPrinter.JSONToTable(payload, projectColumns) + return nil +} diff --git a/flytectl/cmd/register/register.go b/flytectl/cmd/register/register.go new file mode 100644 index 00000000000..f6381521e44 --- /dev/null +++ b/flytectl/cmd/register/register.go @@ -0,0 +1,23 @@ +package register + +import ( + cmdcore "github.com/lyft/flytectl/cmd/core" + "github.com/spf13/cobra" +) + +// RegisterCommand will return register command +func RegisterCommand() *cobra.Command { + registerCmd := &cobra.Command{ + Use: "register", + Short: "Registers tasks/workflows/launchplans from list of generated serialized files.", + Long: "Takes input files as serialized versions of the tasks/workflows/launchplans and registers them with flyteadmin.\n" + + "Currently these input files are protobuf files generated as output from flytekit serialize.\n" + + "Project & Domain are mandatory fields to be passed for registration and an optional version which defaults to v1\n" + + "If the entities are already registered with flyte for the same version then registration would fail.\n", + } + registerResourcesFuncs := map[string]cmdcore.CommandEntry{ + "files": {CmdFunc: registerFromFilesFunc, Aliases: []string{"file"}, PFlagProvider: filesConfig}, + } + cmdcore.AddCommands(registerCmd, registerResourcesFuncs) + return registerCmd +} diff --git a/flytectl/cmd/register/register_test.go b/flytectl/cmd/register/register_test.go new file mode 100644 index 00000000000..99e4a6964f5 --- /dev/null +++ b/flytectl/cmd/register/register_test.go @@ -0,0 +1,24 @@ +package register + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "sort" + "testing" +) + +func TestRegisterCommand(t *testing.T) { + registerCommand := RegisterCommand() + assert.Equal(t, registerCommand.Use, "register") + assert.Equal(t, registerCommand.Short, "Registers tasks/workflows/launchplans from list of generated serialized files.") + fmt.Println(registerCommand.Commands()) + assert.Equal(t, len(registerCommand.Commands()), 1) + cmdNouns := registerCommand.Commands() + // Sort by Use value. + sort.Slice(cmdNouns, func(i, j int) bool { + return cmdNouns[i].Use < cmdNouns[j].Use + }) + assert.Equal(t, cmdNouns[0].Use, "files") + assert.Equal(t, cmdNouns[0].Aliases, []string{"file"}) + assert.Equal(t, cmdNouns[0].Short, "Retrieves files resources.") +} diff --git a/flytectl/cmd/register/register_util.go b/flytectl/cmd/register/register_util.go new file mode 100644 index 00000000000..5149fd1e217 --- /dev/null +++ b/flytectl/cmd/register/register_util.go @@ -0,0 +1,206 @@ +package register + +import ( + "context" + "errors" + "fmt" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "github.com/lyft/flytectl/cmd/config" + cmdCore "github.com/lyft/flytectl/cmd/core" + "github.com/lyft/flytectl/pkg/printer" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/logger" +) + +//go:generate pflags RegisterFilesConfig + +var ( + filesConfig = &RegisterFilesConfig{ + version: "v1", + skipOnError: false, + } +) + +const registrationProjectPattern = "{{ registration.project }}" +const registrationDomainPattern = "{{ registration.domain }}" +const registrationVersionPattern = "{{ registration.version }}" + +type RegisterFilesConfig struct { + version string `json:"version" pflag:",version of the entity to be registered with flyte."` + skipOnError bool `json:"skipOnError" pflag:",fail fast when registering files."` +} + +type RegisterResult struct { + Name string + Status string + Info string +} + +var projectColumns = []printer.Column{ + {"Name", "$.Name"}, + {"Status", "$.Status"}, + {"Additional Info", "$.Info"}, +} + +func unMarshalContents(ctx context.Context, fileContents []byte, fname string) (proto.Message, error) { + workflowSpec := &admin.WorkflowSpec{} + if err := proto.Unmarshal(fileContents, workflowSpec); err == nil { + return workflowSpec, nil + } + logger.Debugf(ctx, "Failed to unmarshal file %v for workflow type", fname) + taskSpec := &admin.TaskSpec{} + if err := proto.Unmarshal(fileContents, taskSpec); err == nil { + return taskSpec, nil + } + logger.Debugf(ctx, "Failed to unmarshal file %v for task type", fname) + launchPlan := &admin.LaunchPlan{} + if err := proto.Unmarshal(fileContents, launchPlan); err == nil { + return launchPlan, nil + } + logger.Debugf(ctx, "Failed to unmarshal file %v for launch plan type", fname) + return nil, errors.New(fmt.Sprintf("Failed unmarshalling file %v", fname)) + +} + +func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext) error { + switch message.(type) { + case *admin.LaunchPlan: + launchPlan := message.(*admin.LaunchPlan) + _, err := cmdCtx.AdminClient().CreateLaunchPlan(ctx, &admin.LaunchPlanCreateRequest{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Project: config.GetConfig().Project, + Domain: config.GetConfig().Domain, + Name: launchPlan.Id.Name, + Version: filesConfig.version, + }, + Spec: launchPlan.Spec, + }) + return err + case *admin.WorkflowSpec: + workflowSpec := message.(*admin.WorkflowSpec) + _, err := cmdCtx.AdminClient().CreateWorkflow(ctx, &admin.WorkflowCreateRequest{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_WORKFLOW, + Project: config.GetConfig().Project, + Domain: config.GetConfig().Domain, + Name: workflowSpec.Template.Id.Name, + Version: filesConfig.version, + }, + Spec: workflowSpec, + }) + return err + case *admin.TaskSpec: + taskSpec := message.(*admin.TaskSpec) + _, err := cmdCtx.AdminClient().CreateTask(ctx, &admin.TaskCreateRequest{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: config.GetConfig().Project, + Domain: config.GetConfig().Domain, + Name: taskSpec.Template.Id.Name, + Version: filesConfig.version, + }, + Spec: taskSpec, + }) + return err + default: + return errors.New(fmt.Sprintf("Failed registering unknown entity %v", message)) + } +} + +func hydrateNode(node *core.Node) error { + targetNode := node.Target + switch targetNode.(type) { + case *core.Node_TaskNode: + taskNodeWrapper := targetNode.(*core.Node_TaskNode) + taskNodeReference := taskNodeWrapper.TaskNode.Reference.(*core.TaskNode_ReferenceId) + hydrateIdentifier(taskNodeReference.ReferenceId) + case *core.Node_WorkflowNode: + workflowNodeWrapper := targetNode.(*core.Node_WorkflowNode) + switch workflowNodeWrapper.WorkflowNode.Reference.(type) { + case *core.WorkflowNode_SubWorkflowRef: + subWorkflowNodeReference := workflowNodeWrapper.WorkflowNode.Reference.(*core.WorkflowNode_SubWorkflowRef) + hydrateIdentifier(subWorkflowNodeReference.SubWorkflowRef) + case *core.WorkflowNode_LaunchplanRef: + launchPlanNodeReference := workflowNodeWrapper.WorkflowNode.Reference.(*core.WorkflowNode_LaunchplanRef) + hydrateIdentifier(launchPlanNodeReference.LaunchplanRef) + default: + errors.New(fmt.Sprintf("Unknown type %T", workflowNodeWrapper.WorkflowNode.Reference)) + } + case *core.Node_BranchNode: + branchNodeWrapper := targetNode.(*core.Node_BranchNode) + hydrateNode(branchNodeWrapper.BranchNode.IfElse.Case.ThenNode) + if len(branchNodeWrapper.BranchNode.IfElse.Other) > 0 { + for _, ifBlock := range branchNodeWrapper.BranchNode.IfElse.Other { + hydrateNode(ifBlock.ThenNode) + } + } + switch branchNodeWrapper.BranchNode.IfElse.Default.(type) { + case *core.IfElseBlock_ElseNode: + elseNodeReference := branchNodeWrapper.BranchNode.IfElse.Default.(*core.IfElseBlock_ElseNode) + hydrateNode(elseNodeReference.ElseNode) + case *core.IfElseBlock_Error: + // Do nothing. + default: + return errors.New(fmt.Sprintf("Unknown type %T", branchNodeWrapper.BranchNode.IfElse.Default)) + } + default: + return errors.New(fmt.Sprintf("Unknown type %T", targetNode)) + } + return nil +} + +func hydrateIdentifier(identifier *core.Identifier) { + if identifier.Project == "" || identifier.Project == registrationProjectPattern { + identifier.Project = config.GetConfig().Project + } + if identifier.Domain == "" || identifier.Domain == registrationDomainPattern { + identifier.Domain = config.GetConfig().Domain + } + if identifier.Version == "" || identifier.Version == registrationVersionPattern { + identifier.Version = filesConfig.version + } +} + +func hydrateSpec(message proto.Message) error { + switch message.(type) { + case *admin.LaunchPlan: + launchPlan := message.(*admin.LaunchPlan) + hydrateIdentifier(launchPlan.Spec.WorkflowId) + case *admin.WorkflowSpec: + workflowSpec := message.(*admin.WorkflowSpec) + for _, Noderef := range workflowSpec.Template.Nodes { + if err := hydrateNode(Noderef); err != nil { + return err + } + } + hydrateIdentifier(workflowSpec.Template.Id) + for _, subWorkflow := range workflowSpec.SubWorkflows { + for _, Noderef := range subWorkflow.Nodes { + if err := hydrateNode(Noderef); err != nil { + return err + } + } + hydrateIdentifier(subWorkflow.Id) + } + case *admin.TaskSpec: + taskSpec := message.(*admin.TaskSpec) + hydrateIdentifier(taskSpec.Template.Id) + default: + return errors.New(fmt.Sprintf("Unknown type %T", message)) + } + return nil +} + +func getJsonSpec(message proto.Message) string { + marshaller := jsonpb.Marshaler{ + EnumsAsInts: false, + EmitDefaults: true, + Indent: " ", + OrigName: true, + } + jsonSpec, _ := marshaller.MarshalToString(message) + return jsonSpec +} diff --git a/flytectl/cmd/register/registerfilesconfig_flags.go b/flytectl/cmd/register/registerfilesconfig_flags.go new file mode 100755 index 00000000000..97c58844858 --- /dev/null +++ b/flytectl/cmd/register/registerfilesconfig_flags.go @@ -0,0 +1,46 @@ +// Code generated by go generate; DO NOT EDIT. +// This file was generated by robots. + +package register + +import ( + "encoding/json" + "fmt" + "reflect" + + "github.com/spf13/pflag" +) + +// If v is a pointer, it will get its element value or the zero value of the element type. +// If v is not a pointer, it will return it as is. +func (RegisterFilesConfig) elemValueOrNil(v interface{}) interface{} { + if t := reflect.TypeOf(v); t.Kind() == reflect.Ptr { + if reflect.ValueOf(v).IsNil() { + return reflect.Zero(t.Elem()).Interface() + } else { + return reflect.ValueOf(v).Interface() + } + } else if v == nil { + return reflect.Zero(t).Interface() + } + + return v +} + +func (RegisterFilesConfig) mustMarshalJSON(v json.Marshaler) string { + raw, err := v.MarshalJSON() + if err != nil { + panic(err) + } + + return string(raw) +} + +// GetPFlagSet will return strongly types pflags for all fields in RegisterFilesConfig and its nested types. The format of the +// flags is json-name.json-sub-name... etc. +func (cfg RegisterFilesConfig) GetPFlagSet(prefix string) *pflag.FlagSet { + cmdFlags := pflag.NewFlagSet("RegisterFilesConfig", pflag.ExitOnError) + cmdFlags.StringVarP(&(filesConfig.version),fmt.Sprintf("%v%v", prefix, "version"), "v", "v1", "version of the entity to be registered with flyte.") + cmdFlags.BoolVarP(&(filesConfig.skipOnError), fmt.Sprintf("%v%v", prefix, "skipOnError"), "s", *new(bool), "fail fast when registering files.") + return cmdFlags +} diff --git a/flytectl/cmd/register/registerfilesconfig_flags_test.go b/flytectl/cmd/register/registerfilesconfig_flags_test.go new file mode 100755 index 00000000000..9c055e83e4b --- /dev/null +++ b/flytectl/cmd/register/registerfilesconfig_flags_test.go @@ -0,0 +1,146 @@ +// Code generated by go generate; DO NOT EDIT. +// This file was generated by robots. + +package register + +import ( + "encoding/json" + "fmt" + "reflect" + "strings" + "testing" + + "github.com/mitchellh/mapstructure" + "github.com/stretchr/testify/assert" +) + +var dereferencableKindsRegisterFilesConfig = map[reflect.Kind]struct{}{ + reflect.Array: {}, reflect.Chan: {}, reflect.Map: {}, reflect.Ptr: {}, reflect.Slice: {}, +} + +// Checks if t is a kind that can be dereferenced to get its underlying type. +func canGetElementRegisterFilesConfig(t reflect.Kind) bool { + _, exists := dereferencableKindsRegisterFilesConfig[t] + return exists +} + +// This decoder hook tests types for json unmarshaling capability. If implemented, it uses json unmarshal to build the +// object. Otherwise, it'll just pass on the original data. +func jsonUnmarshalerHookRegisterFilesConfig(_, to reflect.Type, data interface{}) (interface{}, error) { + unmarshalerType := reflect.TypeOf((*json.Unmarshaler)(nil)).Elem() + if to.Implements(unmarshalerType) || reflect.PtrTo(to).Implements(unmarshalerType) || + (canGetElementRegisterFilesConfig(to.Kind()) && to.Elem().Implements(unmarshalerType)) { + + raw, err := json.Marshal(data) + if err != nil { + fmt.Printf("Failed to marshal Data: %v. Error: %v. Skipping jsonUnmarshalHook", data, err) + return data, nil + } + + res := reflect.New(to).Interface() + err = json.Unmarshal(raw, &res) + if err != nil { + fmt.Printf("Failed to umarshal Data: %v. Error: %v. Skipping jsonUnmarshalHook", data, err) + return data, nil + } + + return res, nil + } + + return data, nil +} + +func decode_RegisterFilesConfig(input, result interface{}) error { + config := &mapstructure.DecoderConfig{ + TagName: "json", + WeaklyTypedInput: true, + Result: result, + DecodeHook: mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + jsonUnmarshalerHookRegisterFilesConfig, + ), + } + + decoder, err := mapstructure.NewDecoder(config) + if err != nil { + return err + } + + return decoder.Decode(input) +} + +func join_RegisterFilesConfig(arr interface{}, sep string) string { + listValue := reflect.ValueOf(arr) + strs := make([]string, 0, listValue.Len()) + for i := 0; i < listValue.Len(); i++ { + strs = append(strs, fmt.Sprintf("%v", listValue.Index(i))) + } + + return strings.Join(strs, sep) +} + +func testDecodeJson_RegisterFilesConfig(t *testing.T, val, result interface{}) { + assert.NoError(t, decode_RegisterFilesConfig(val, result)) +} + +func testDecodeSlice_RegisterFilesConfig(t *testing.T, vStringSlice, result interface{}) { + assert.NoError(t, decode_RegisterFilesConfig(vStringSlice, result)) +} + +func TestRegisterFilesConfig_GetPFlagSet(t *testing.T) { + val := RegisterFilesConfig{} + cmdFlags := val.GetPFlagSet("") + assert.True(t, cmdFlags.HasFlags()) +} + +func TestRegisterFilesConfig_SetFlags(t *testing.T) { + actual := RegisterFilesConfig{} + cmdFlags := actual.GetPFlagSet("") + assert.True(t, cmdFlags.HasFlags()) + + t.Run("Test_version", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vString, err := cmdFlags.GetString("version"); err == nil { + assert.Equal(t, string("v1"), vString) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("version", testValue) + if vString, err := cmdFlags.GetString("version"); err == nil { + testDecodeJson_RegisterFilesConfig(t, fmt.Sprintf("%v", vString), &actual.version) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_skipOnError", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vBool, err := cmdFlags.GetBool("skipOnError"); err == nil { + assert.Equal(t, bool(*new(bool)), vBool) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("skipOnError", testValue) + if vBool, err := cmdFlags.GetBool("skipOnError"); err == nil { + testDecodeJson_RegisterFilesConfig(t, fmt.Sprintf("%v", vBool), &actual.skipOnError) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) +} diff --git a/flytectl/cmd/root.go b/flytectl/cmd/root.go index 11593ba3c01..39f5d25e8dc 100644 --- a/flytectl/cmd/root.go +++ b/flytectl/cmd/root.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "github.com/lyft/flytectl/cmd/register" "github.com/lyft/flytectl/cmd/get" "github.com/lyft/flytectl/pkg/printer" @@ -37,6 +38,7 @@ func newRootCmd() *cobra.Command { rootCmd.AddCommand(viper.GetConfigCommand()) rootCmd.AddCommand(versionCmd) rootCmd.AddCommand(get.CreateGetCommand()) + rootCmd.AddCommand(register.RegisterCommand()) config.GetConfig() return rootCmd diff --git a/flytectl/go.mod b/flytectl/go.mod index e7872d901a0..b2617470b10 100644 --- a/flytectl/go.mod +++ b/flytectl/go.mod @@ -8,7 +8,7 @@ require ( github.com/golang/protobuf v1.3.2 github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 github.com/landoop/tableprinter v0.0.0-20180806200924-8bd8c2576d27 - github.com/lyft/flyteidl v0.18.1 + github.com/lyft/flyteidl v0.18.11 github.com/lyft/flytestdlib v0.3.10-0.20200619054107-45f341b716fa github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mitchellh/mapstructure v1.1.2 diff --git a/flytectl/go.sum b/flytectl/go.sum index 729c0ddc2e5..b89512e6728 100644 --- a/flytectl/go.sum +++ b/flytectl/go.sum @@ -192,6 +192,8 @@ github.com/landoop/tableprinter v0.0.0-20200104100433-ae9249991eb1 h1:SH30nioTpP github.com/landoop/tableprinter v0.0.0-20200104100433-ae9249991eb1/go.mod h1:f0X1c0za3TbET/rl5ThtCSel0+G3/yZ8iuU9BxnyVK0= github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio= github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.18.11 h1:24NaFYWxANhRbwKfvkgu8axGTWUcl1tgZBqNJutKNJ8= +github.com/lyft/flyteidl v0.18.11/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.10-0.20200619054107-45f341b716fa h1:anYLG/feMdMv321AfhHGBRd0S3cPLFFTZvJYssjhxFo= github.com/lyft/flytestdlib v0.3.10-0.20200619054107-45f341b716fa/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/flytectl/pkg/printer/printer.go b/flytectl/pkg/printer/printer.go index e1d57d32667..a41d6bcb92d 100644 --- a/flytectl/pkg/printer/printer.go +++ b/flytectl/pkg/printer/printer.go @@ -73,7 +73,7 @@ func projectColumns(rows []interface{}, column []Column) ([][]string, error) { return responses, nil } -func JSONToTable(jsonRows []byte, columns []Column) error { +func (p Printer) JSONToTable(jsonRows []byte, columns []Column) error { var rawRows []interface{} if err := json.Unmarshal(jsonRows, &rawRows); err != nil { return errors.Wrapf("JSONUnmarshalFailure", err, "failed to unmarshal into []interface{} from json") @@ -90,6 +90,9 @@ func JSONToTable(jsonRows []byte, columns []Column) error { printer.AutoWrapText = false printer.BorderLeft = true printer.BorderRight = true + printer.BorderBottom = true + printer.BorderTop = true + printer.RowLine = true printer.ColumnSeparator = "|" printer.HeaderBgColor = tablewriter.BgHiWhiteColor headers := make([]string, 0, len(columns)) @@ -141,7 +144,7 @@ func (p Printer) Print(format OutputFormat, columns []Column, messages ...proto. if err != nil { return errors.Wrapf("ProtoToJSONFailure", err, "failed to marshal proto messages") } - return JSONToTable(rows, columns) + return p.JSONToTable(rows, columns) } return nil }