Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploy multiple #32

Merged
merged 10 commits into from
Jan 30, 2019
99 changes: 70 additions & 29 deletions cli/cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package cmd

import (
"encoding/json"
"errors"
"github.com/pkg/errors"
"io/ioutil"
"log"
"os"
"path"
"strings"

"github.com/ricardo-ch/go-kafka-connect/lib/connectors"
Expand All @@ -28,61 +31,99 @@ import (
var createCmd = &cobra.Command{
Use: "create",
Short: "Create a new connector",
Long: `Create a connector using either a config file or a literal string
flags:
--url -u: url of the kafka-connect server
--file -f: path to the config file
--string -s: JSON configuration string
--sync -y: execute synchronously
`,
RunE: RunECreate,
RunE: RunECreate,
}

//RunECreate ...
func RunECreate(cmd *cobra.Command, args []string) error {
config, err := getCreateCmdConfig(cmd)
configs, err := getCreateCmdConfig(cmd)
if err != nil {
return err
}

resp, err := getClient().CreateConnector(config, sync)
if err != nil {
return err
//TODO was not expecting I would have to update CreateConnector when adding multiple file deployment feature
// will have to add properly later
for _, config := range configs {
resp, err := getClient().CreateConnector(config, sync)
printResponse(resp)
if err != nil {
return err
}
}

return printResponse(resp)
return nil
}

func getCreateCmdConfig(cmd *cobra.Command) (connectors.CreateConnectorRequest, error) {
config := connectors.CreateConnectorRequest{}
func getCreateCmdConfig(cmd *cobra.Command) ([]connectors.CreateConnectorRequest, error) {
var configs []connectors.CreateConnectorRequest

if cmd.Flag("file").Changed {
fileReader, err := os.Open(file)
if cmd.Flag("path").Changed {
fileInfo, err := os.Stat(filePath)
if err != nil {
return config, err
return nil, errors.Wrapf(err, "error while trying to find input or folder: %v", filePath)
}

err = json.NewDecoder(fileReader).Decode(&config)
if err != nil {
return config, err
if fileInfo.IsDir() {
configs, err = getConfigFromFolder(filePath)
if err != nil {
return nil, err
}
} else {
config, err := getConfigFromFile(filePath)
if err != nil {
return nil, err
}
configs = append(configs, config)
}

} else if cmd.Flag("string").Changed {
config := connectors.CreateConnectorRequest{}
err := json.NewDecoder(strings.NewReader(configString)).Decode(&config)
if err != nil {
return config, err
return nil, err
}
configs = append(configs, config)
} else {
return config, errors.New("neither file nor string was supplied")
return nil, errors.New("neither path nor string was supplied")
}
return configs, nil
}

func getConfigFromFolder(folderPath string) ([]connectors.CreateConnectorRequest, error) {
configs := []connectors.CreateConnectorRequest{}
configFiles, err := ioutil.ReadDir(folderPath)
if err != nil {
return configs, err
}
for _, fileInfo := range configFiles {
if fileInfo.IsDir() {
log.Printf("found unexpected subfolder in folder: %s. This command will not search through it.", filePath)
continue
}
config, err := getConfigFromFile(path.Join(folderPath, fileInfo.Name()))
if err != nil {
log.Printf("found unexpected not config file in folder: %s", filePath)
} else {
configs = append(configs, config)
}
}
return config, nil
return configs, nil
}

func getConfigFromFile(filePath string) (connectors.CreateConnectorRequest, error) {
config := connectors.CreateConnectorRequest{}
fileReader, err := os.Open(filePath)
if err != nil {
return config, err
}

err = json.NewDecoder(fileReader).Decode(&config)
return config, err
}

func init() {
RootCmd.AddCommand(createCmd)

createCmd.PersistentFlags().StringVarP(&file, "file", "f", "", "path to the config file")
createCmd.MarkFlagFilename("file")
createCmd.PersistentFlags().StringVarP(&filePath, "path", "p", "", "path to the config file")
createCmd.MarkFlagFilename("path")
createCmd.PersistentFlags().StringVarP(&configString, "string", "s", "", "JSON configuration string")
createCmd.PersistentFlags().BoolVarP(&sync, "sync", "y", false, "execute synchronously")

Expand Down
7 changes: 1 addition & 6 deletions cli/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ import (
var deleteCmd = &cobra.Command{
Use: "delete",
Short: "Delete an existing connector",
Long: `Delete an existing connector
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--sync -y: execute synchronously`,
RunE: RunEDelete,
RunE: RunEDelete,
}

//RunEDelete ...
Expand Down
14 changes: 5 additions & 9 deletions cli/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,23 @@ var deployCmd = &cobra.Command{
Use: "deploy",
Short: "Deploy a new connector",
Long: `Deploy a new connector or replace the old version if it alrerady exists.
This command is executes all its steps synchronously.
flags:
--url -u: url of the kafka-connect server
--file -f: path to the config file
--string -s: literal configuration string`,
This command is executes all its steps synchronously.`,
RunE: RunEDeploy,
}

func RunEDeploy(cmd *cobra.Command, args []string) error {
config, err := getCreateCmdConfig(cmd)
configs, err := getCreateCmdConfig(cmd)
if err != nil {
return err
}

return getClient().DeployConnector(config)
return getClient().DeployMultipleConnector(configs)
}

func init() {
RootCmd.AddCommand(deployCmd)

deployCmd.PersistentFlags().StringVarP(&file, "file", "f", "", "path to the config file")
deployCmd.MarkFlagFilename("file")
deployCmd.PersistentFlags().StringVarP(&filePath, "path", "p", "", "path to the config file or folder")
deployCmd.MarkFlagFilename("path")
deployCmd.PersistentFlags().StringVarP(&configString, "string", "s", "", "JSON configuration string")
}
12 changes: 3 additions & 9 deletions cli/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ var getCmd = &cobra.Command{
Use: "get",
Short: "Retrieve information from kafka-connect",
Long: `Get reads from the kafka-connect REST API.
It can get the list of all deployed connectors, or details about a single one.
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--status -s: get the connector's status (requires -n)
--config -c: get the connector's config (requires -n)
--tasks -t: get the connector's tasks list (requires -n)`,
It can get the list of all deployed connectors, or details about a single one.`,
RunE: handleCmd,
}

Expand All @@ -57,10 +51,10 @@ func handleCmd(cmd *cobra.Command, args []string) error {

func validateArgs() error {
if connector == "" {
return errors.New("Please specify the target connector's name")
return errors.New("please specify the target connector's name")
}
if (status && config) || (status && tasks) || (config && tasks) {
return errors.New("More than one action were provided")
return errors.New("more than one action were provided")
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions cli/cmd/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ func printResponse(response interface{}) error {
return nil
}

func getClient() *connectors.Client {
func getClient() connectors.HighLevelClient {
client := connectors.NewClient(url)
if verbose {
client = client.WithDebug()
client.SetDebug()
}
if SSLInsecure {
client = client.WithInsecureSSL()
client.SetInsecureSSL()
}
return client
}
7 changes: 1 addition & 6 deletions cli/cmd/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ import (
var pauseCmd = &cobra.Command{
Use: "pause",
Short: "Pause a connector",
Long: `Suspend a connector without deleting it.
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--sync -y: execute synchronously`,
RunE: RunEPause,
RunE: RunEPause,
}

//RunEPause ...
Expand Down
7 changes: 1 addition & 6 deletions cli/cmd/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ import (
var resumeCmd = &cobra.Command{
Use: "resume",
Short: "Resume a connector",
Long: `Resume a paused connector.
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--sync -y: execute synchronously`,
RunE: RunEResume,
RunE: RunEResume,
}

//RunEResume ...
Expand Down
3 changes: 1 addition & 2 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ import (
var (
url string
connector string
file string
filePath string
configString string
sync bool
status bool
config bool
tasks bool
verbose bool
SSLInsecure bool
output string
)

var RootCmd = &cobra.Command{
Expand Down
17 changes: 5 additions & 12 deletions cli/cmd/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,7 @@ var update updateCmdConfig
var updateCmd = &cobra.Command{
Use: "update",
Short: "Updater a connector",
Long: `Update a connector's configuration
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--file -f: path to the config file
--string -s: literal configuration string
--sync -y: execute synchronously`,
RunE: RunEUpdate,
RunE: RunEUpdate,
}

//RunEUpdate ...
Expand All @@ -68,7 +61,7 @@ func RunEUpdate(cmd *cobra.Command, args []string) error {
func getUpdateCmdConfig(cmd *cobra.Command) (map[string]interface{}, error) {
config := map[string]interface{}{}

if cmd.Flag("file").Changed {
if cmd.Flag("path").Changed {
fileReader, err := os.Open(update.file)
if err != nil {
return config, err
Expand All @@ -85,16 +78,16 @@ func getUpdateCmdConfig(cmd *cobra.Command) (map[string]interface{}, error) {
return config, err
}
} else {
return config, errors.New("neither file nor string was supplied")
return config, errors.New("neither input nor string was supplied")
}
return config, nil
}

func init() {
RootCmd.AddCommand(updateCmd)

updateCmd.PersistentFlags().StringVarP(&update.file, "file", "f", "", "path to the config file")
updateCmd.MarkFlagFilename("file")
updateCmd.PersistentFlags().StringVarP(&update.file, "path", "p", "", "path to the config file")
updateCmd.MarkFlagFilename("path")
updateCmd.PersistentFlags().StringVarP(&update.configString, "string", "s", "", "JSON configuration string")
updateCmd.PersistentFlags().StringVarP(&update.connector, "connector", "n", "", "name of the target connector")
updateCmd.MarkFlagRequired("connector")
Expand Down
14 changes: 13 additions & 1 deletion lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install:
.PHONY: test-integration
test-integration:
make rundep
go test -v `go list ./... | grep -v /vendor/` -tags=integration
go test -tags=integration -count=1 ./...

.PHONY: rundep
rundep:
Expand All @@ -19,3 +19,15 @@ rundep:
sleep 2; \
done
@echo "up and running"

MOCKERY_PATH := $(shell [ -z "$${GOBIN}" ] && echo $${GOPATH}/bin/mockery || echo $${GOBIN}/mockery; )

.PHONY: update-mocks
update-mocks:
go get github.com/vektra/mockery/...
${MOCKERY_PATH} -inpkg -case "underscore" -recursive -all -note "NOTE: run 'make update-mocks' from this project top folder to update this file and generate new ones."

.PHONY: test-unit
test-unit:
go test -tags=unit ./...

Loading