Skip to content

Commit

Permalink
Add meroxa functions (#239)
Browse files Browse the repository at this point in the history
* Create function

* List functions

* Get function

* Delete function

* Display env vars

* Fix vet
  • Loading branch information
owenthereal authored Jan 26, 2022
1 parent ede7bfa commit bac2204
Show file tree
Hide file tree
Showing 26 changed files with 1,186 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/meroxa/root/environments/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestListEnvironmentsExecution(t *testing.T) {
Type: meroxa.EnvironmentTypePrivate,
Name: "environment-1234",
Provider: meroxa.EnvironmentProviderAws,
Region: meroxa.EnvironmentRegionUsEast2,
Region: meroxa.EnvironmentRegionUsEast1,
Status: meroxa.EnvironmentViewStatus{State: meroxa.EnvironmentStateProvisioned},
UUID: "531428f7-4e86-4094-8514-d397d49026f7",
}
Expand Down
143 changes: 143 additions & 0 deletions cmd/meroxa/root/functions/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package functions

import (
"context"
"fmt"
"strings"

"github.com/mattn/go-shellwords"
"github.com/meroxa/cli/cmd/meroxa/builder"
"github.com/meroxa/cli/log"
"github.com/meroxa/meroxa-go/pkg/meroxa"
)

var (
_ builder.CommandWithDocs = (*Create)(nil)
_ builder.CommandWithArgs = (*Create)(nil)
_ builder.CommandWithFlags = (*Create)(nil)
_ builder.CommandWithClient = (*Create)(nil)
_ builder.CommandWithLogger = (*Create)(nil)
_ builder.CommandWithExecute = (*Create)(nil)
)

type createFunctionClient interface {
CreateFunction(ctx context.Context, input *meroxa.CreateFunctionInput) (*meroxa.Function, error)
}

type Create struct {
client createFunctionClient
logger log.Logger

args struct {
Name string
}

flags struct {
InputStream string `long:"input-stream" usage:"an input stream to the function" required:"true"`
Image string `long:"image" usage:"Docker image name" required:"true"`
Command string `long:"command" usage:"Entrypoint command"`
Args string `long:"args" usage:"Arguments to the entrypoint"`
EnvVars []string `long:"env" usage:"List of environment variables to set in the function"`
Pipeline string `long:"pipeline" usage:"pipeline name to attach function to" required:"true"`
}
}

func (c *Create) Usage() string {
return "create [NAME] [flags]"
}

func (c *Create) Docs() builder.Docs {
return builder.Docs{
Short: "Create a function",
Long: "Use `functions create` to create a function to process records from an input steram (--input-stream)",
Example: `
meroxa functions create [NAME] --input-stream connector-output-stream --image myimage --pipeline my-pipeline
meroxa functions create [NAME] --input-stream connector-output-stream --image myimage --pipeline my-pipeline --env FOO=BAR --env BAR=BAZ
`,
}
}

func (c *Create) Execute(ctx context.Context) error {
envVars, err := c.parseEnvVars(c.flags.EnvVars)
if err != nil {
return err
}

var (
command []string
args []string
)
if cmd := c.flags.Command; cmd != "" {
command, err = shellwords.Parse(cmd)
if err != nil {
return err
}
}
if a := c.flags.Args; a != "" {
args, err = shellwords.Parse(a)
if err != nil {
return err
}
}

fun, err := c.client.CreateFunction(
ctx,
&meroxa.CreateFunctionInput{
Name: c.args.Name,
InputStream: c.flags.InputStream,
Pipeline: meroxa.PipelineIdentifier{
Name: c.flags.Pipeline,
},
Image: c.flags.Image,
Command: command,
Args: args,
EnvVars: envVars,
},
)
if err != nil {
return err
}

c.logger.Infof(ctx, "Function %q successfully created!\n", fun.Name)
c.logger.JSON(ctx, fun)

return nil
}

func (c *Create) parseEnvVars(envVars []string) (map[string]string, error) {
m := make(map[string]string)
for _, ev := range envVars {
var (
split = strings.SplitN(ev, "=", 2) //nolint
key = split[0]
val = split[1]
)

if key == "" || val == "" {
return nil, fmt.Errorf("error parsing env var %q", ev)
}

m[key] = val
}

return m, nil
}

func (c *Create) Client(client meroxa.Client) {
c.client = client
}

func (c *Create) Logger(logger log.Logger) {
c.logger = logger
}

func (c *Create) Flags() []builder.Flag {
return builder.BuildFlags(&c.flags)
}

func (c *Create) ParseArgs(args []string) error {
if len(args) > 0 {
c.args.Name = args[0]
}
return nil
}
25 changes: 25 additions & 0 deletions cmd/meroxa/root/functions/create_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package functions

import (
"testing"

"github.com/google/go-cmp/cmp"
)

func TestCreate(t *testing.T) {
c := &Create{}

m, err := c.parseEnvVars([]string{"KEY=VAL"})
if err != nil {
t.Fatal(err)
}

if diff := cmp.Diff(map[string]string{"KEY": "VAL"}, m); diff != "" {
t.Fatalf("mismatch of parsed env vars (-want +got): %s", diff)
}

_, err = c.parseEnvVars([]string{"=VAL"})
if err == nil {
t.Fatal("error should not be nil")
}
}
71 changes: 71 additions & 0 deletions cmd/meroxa/root/functions/describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package functions

import (
"context"
"errors"

"github.com/meroxa/cli/cmd/meroxa/builder"
"github.com/meroxa/cli/log"
"github.com/meroxa/cli/utils"
"github.com/meroxa/meroxa-go/pkg/meroxa"
)

var (
_ builder.CommandWithDocs = (*Describe)(nil)
_ builder.CommandWithArgs = (*Describe)(nil)
_ builder.CommandWithClient = (*Describe)(nil)
_ builder.CommandWithLogger = (*Describe)(nil)
_ builder.CommandWithExecute = (*Describe)(nil)
)

type describeFunctionClient interface {
GetFunction(ctx context.Context, nameOrUUID string) (*meroxa.Function, error)
}

type Describe struct {
client describeFunctionClient
logger log.Logger

args struct {
NameOrUUID string
}
}

func (d *Describe) Usage() string {
return "describe [NAMEorUUID]"
}

func (d *Describe) Docs() builder.Docs {
return builder.Docs{
Short: "Describe function",
}
}

func (d *Describe) Execute(ctx context.Context) error {
fun, err := d.client.GetFunction(ctx, d.args.NameOrUUID)
if err != nil {
return err
}

d.logger.Info(ctx, utils.FunctionTable(fun))
d.logger.JSON(ctx, fun)

return nil
}

func (d *Describe) Client(client meroxa.Client) {
d.client = client
}

func (d *Describe) Logger(logger log.Logger) {
d.logger = logger
}

func (d *Describe) ParseArgs(args []string) error {
if len(args) < 1 {
return errors.New("requires function name")
}

d.args.NameOrUUID = args[0]
return nil
}
49 changes: 49 additions & 0 deletions cmd/meroxa/root/functions/functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package functions

import (
"fmt"

"github.com/meroxa/cli/cmd/meroxa/builder"
"github.com/spf13/cobra"
)

type Functions struct{}

var (
_ builder.CommandWithAliases = (*Functions)(nil)
_ builder.CommandWithDocs = (*Functions)(nil)
_ builder.CommandWithFeatureFlag = (*Functions)(nil)
_ builder.CommandWithSubCommands = (*Functions)(nil)
_ builder.CommandWithHidden = (*Functions)(nil)
)

func (*Functions) Usage() string {
return "functions"
}

func (*Functions) Hidden() bool {
return true
}

func (*Functions) FeatureFlag() (string, error) {
return "functions", fmt.Errorf(`no access to the Meroxa functions feature`)
}

func (*Functions) Docs() builder.Docs {
return builder.Docs{
Short: "Manage functions on Meroxa",
}
}

func (*Functions) Aliases() []string {
return []string{"function"}
}

func (*Functions) SubCommands() []*cobra.Command {
return []*cobra.Command{
builder.BuildCobraCommand(&Create{}),
builder.BuildCobraCommand(&List{}),
builder.BuildCobraCommand(&Describe{}),
builder.BuildCobraCommand(&Remove{}),
}
}
67 changes: 67 additions & 0 deletions cmd/meroxa/root/functions/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package functions

import (
"context"

"github.com/meroxa/cli/cmd/meroxa/builder"
"github.com/meroxa/cli/log"
"github.com/meroxa/cli/utils"
"github.com/meroxa/meroxa-go/pkg/meroxa"
)

var (
_ builder.CommandWithDocs = (*List)(nil)
_ builder.CommandWithClient = (*List)(nil)
_ builder.CommandWithLogger = (*List)(nil)
_ builder.CommandWithExecute = (*List)(nil)
_ builder.CommandWithAliases = (*List)(nil)
_ builder.CommandWithNoHeaders = (*List)(nil)
)

type listFunctionClient interface {
ListFunctions(ctx context.Context) ([]*meroxa.Function, error)
}

type List struct {
client listFunctionClient
logger log.Logger
hideHeaders bool
}

func (l *List) Execute(ctx context.Context) error {
funs, err := l.client.ListFunctions(ctx)
if err != nil {
return err
}

l.logger.JSON(ctx, funs)
l.logger.Info(ctx, utils.FunctionsTable(funs, l.hideHeaders))

return nil
}

func (l *List) Usage() string {
return "list"
}

func (l *List) Docs() builder.Docs {
return builder.Docs{
Short: "List functions",
}
}

func (l *List) Aliases() []string {
return []string{"ls"}
}

func (l *List) Client(client meroxa.Client) {
l.client = client
}

func (l *List) Logger(logger log.Logger) {
l.logger = logger
}

func (l *List) HideHeaders(hide bool) {
l.hideHeaders = hide
}
Loading

0 comments on commit bac2204

Please sign in to comment.