Skip to content

Commit

Permalink
Deploy multiple (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancoisPoinsot authored Jan 30, 2019
1 parent 381d081 commit 96d47e6
Show file tree
Hide file tree
Showing 19 changed files with 1,475 additions and 385 deletions.
99 changes: 70 additions & 29 deletions cli/cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package cmd

import (
"encoding/json"
"errors"
"github.com/pkg/errors"
"io/ioutil"
"log"
"os"
"path"
"strings"

"github.com/ricardo-ch/go-kafka-connect/lib/connectors"
Expand All @@ -28,61 +31,99 @@ import (
var createCmd = &cobra.Command{
Use: "create",
Short: "Create a new connector",
Long: `Create a connector using either a config file or a literal string
flags:
--url -u: url of the kafka-connect server
--file -f: path to the config file
--string -s: JSON configuration string
--sync -y: execute synchronously
`,
RunE: RunECreate,
RunE: RunECreate,
}

//RunECreate ...
func RunECreate(cmd *cobra.Command, args []string) error {
config, err := getCreateCmdConfig(cmd)
configs, err := getCreateCmdConfig(cmd)
if err != nil {
return err
}

resp, err := getClient().CreateConnector(config, sync)
if err != nil {
return err
//TODO was not expecting I would have to update CreateConnector when adding multiple file deployment feature
// will have to add properly later
for _, config := range configs {
resp, err := getClient().CreateConnector(config, sync)
printResponse(resp)
if err != nil {
return err
}
}

return printResponse(resp)
return nil
}

func getCreateCmdConfig(cmd *cobra.Command) (connectors.CreateConnectorRequest, error) {
config := connectors.CreateConnectorRequest{}
func getCreateCmdConfig(cmd *cobra.Command) ([]connectors.CreateConnectorRequest, error) {
var configs []connectors.CreateConnectorRequest

if cmd.Flag("file").Changed {
fileReader, err := os.Open(file)
if cmd.Flag("path").Changed {
fileInfo, err := os.Stat(filePath)
if err != nil {
return config, err
return nil, errors.Wrapf(err, "error while trying to find input or folder: %v", filePath)
}

err = json.NewDecoder(fileReader).Decode(&config)
if err != nil {
return config, err
if fileInfo.IsDir() {
configs, err = getConfigFromFolder(filePath)
if err != nil {
return nil, err
}
} else {
config, err := getConfigFromFile(filePath)
if err != nil {
return nil, err
}
configs = append(configs, config)
}

} else if cmd.Flag("string").Changed {
config := connectors.CreateConnectorRequest{}
err := json.NewDecoder(strings.NewReader(configString)).Decode(&config)
if err != nil {
return config, err
return nil, err
}
configs = append(configs, config)
} else {
return config, errors.New("neither file nor string was supplied")
return nil, errors.New("neither path nor string was supplied")
}
return configs, nil
}

func getConfigFromFolder(folderPath string) ([]connectors.CreateConnectorRequest, error) {
configs := []connectors.CreateConnectorRequest{}
configFiles, err := ioutil.ReadDir(folderPath)
if err != nil {
return configs, err
}
for _, fileInfo := range configFiles {
if fileInfo.IsDir() {
log.Printf("found unexpected subfolder in folder: %s. This command will not search through it.", filePath)
continue
}
config, err := getConfigFromFile(path.Join(folderPath, fileInfo.Name()))
if err != nil {
log.Printf("found unexpected not config file in folder: %s", filePath)
} else {
configs = append(configs, config)
}
}
return config, nil
return configs, nil
}

func getConfigFromFile(filePath string) (connectors.CreateConnectorRequest, error) {
config := connectors.CreateConnectorRequest{}
fileReader, err := os.Open(filePath)
if err != nil {
return config, err
}

err = json.NewDecoder(fileReader).Decode(&config)
return config, err
}

func init() {
RootCmd.AddCommand(createCmd)

createCmd.PersistentFlags().StringVarP(&file, "file", "f", "", "path to the config file")
createCmd.MarkFlagFilename("file")
createCmd.PersistentFlags().StringVarP(&filePath, "path", "p", "", "path to the config file")
createCmd.MarkFlagFilename("path")
createCmd.PersistentFlags().StringVarP(&configString, "string", "s", "", "JSON configuration string")
createCmd.PersistentFlags().BoolVarP(&sync, "sync", "y", false, "execute synchronously")

Expand Down
7 changes: 1 addition & 6 deletions cli/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ import (
var deleteCmd = &cobra.Command{
Use: "delete",
Short: "Delete an existing connector",
Long: `Delete an existing connector
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--sync -y: execute synchronously`,
RunE: RunEDelete,
RunE: RunEDelete,
}

//RunEDelete ...
Expand Down
14 changes: 5 additions & 9 deletions cli/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,23 @@ var deployCmd = &cobra.Command{
Use: "deploy",
Short: "Deploy a new connector",
Long: `Deploy a new connector or replace the old version if it alrerady exists.
This command is executes all its steps synchronously.
flags:
--url -u: url of the kafka-connect server
--file -f: path to the config file
--string -s: literal configuration string`,
This command is executes all its steps synchronously.`,
RunE: RunEDeploy,
}

func RunEDeploy(cmd *cobra.Command, args []string) error {
config, err := getCreateCmdConfig(cmd)
configs, err := getCreateCmdConfig(cmd)
if err != nil {
return err
}

return getClient().DeployConnector(config)
return getClient().DeployMultipleConnector(configs)
}

func init() {
RootCmd.AddCommand(deployCmd)

deployCmd.PersistentFlags().StringVarP(&file, "file", "f", "", "path to the config file")
deployCmd.MarkFlagFilename("file")
deployCmd.PersistentFlags().StringVarP(&filePath, "path", "p", "", "path to the config file or folder")
deployCmd.MarkFlagFilename("path")
deployCmd.PersistentFlags().StringVarP(&configString, "string", "s", "", "JSON configuration string")
}
12 changes: 3 additions & 9 deletions cli/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ var getCmd = &cobra.Command{
Use: "get",
Short: "Retrieve information from kafka-connect",
Long: `Get reads from the kafka-connect REST API.
It can get the list of all deployed connectors, or details about a single one.
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--status -s: get the connector's status (requires -n)
--config -c: get the connector's config (requires -n)
--tasks -t: get the connector's tasks list (requires -n)`,
It can get the list of all deployed connectors, or details about a single one.`,
RunE: handleCmd,
}

Expand All @@ -57,10 +51,10 @@ func handleCmd(cmd *cobra.Command, args []string) error {

func validateArgs() error {
if connector == "" {
return errors.New("Please specify the target connector's name")
return errors.New("please specify the target connector's name")
}
if (status && config) || (status && tasks) || (config && tasks) {
return errors.New("More than one action were provided")
return errors.New("more than one action were provided")
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions cli/cmd/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ func printResponse(response interface{}) error {
return nil
}

func getClient() *connectors.Client {
func getClient() connectors.HighLevelClient {
client := connectors.NewClient(url)
if verbose {
client = client.WithDebug()
client.SetDebug()
}
if SSLInsecure {
client = client.WithInsecureSSL()
client.SetInsecureSSL()
}
return client
}
7 changes: 1 addition & 6 deletions cli/cmd/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ import (
var pauseCmd = &cobra.Command{
Use: "pause",
Short: "Pause a connector",
Long: `Suspend a connector without deleting it.
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--sync -y: execute synchronously`,
RunE: RunEPause,
RunE: RunEPause,
}

//RunEPause ...
Expand Down
7 changes: 1 addition & 6 deletions cli/cmd/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ import (
var resumeCmd = &cobra.Command{
Use: "resume",
Short: "Resume a connector",
Long: `Resume a paused connector.
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--sync -y: execute synchronously`,
RunE: RunEResume,
RunE: RunEResume,
}

//RunEResume ...
Expand Down
3 changes: 1 addition & 2 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ import (
var (
url string
connector string
file string
filePath string
configString string
sync bool
status bool
config bool
tasks bool
verbose bool
SSLInsecure bool
output string
)

var RootCmd = &cobra.Command{
Expand Down
17 changes: 5 additions & 12 deletions cli/cmd/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,7 @@ var update updateCmdConfig
var updateCmd = &cobra.Command{
Use: "update",
Short: "Updater a connector",
Long: `Update a connector's configuration
flags:
--url -u: url of the kafka-connect server
--connector -n: name of the target connector
--file -f: path to the config file
--string -s: literal configuration string
--sync -y: execute synchronously`,
RunE: RunEUpdate,
RunE: RunEUpdate,
}

//RunEUpdate ...
Expand All @@ -68,7 +61,7 @@ func RunEUpdate(cmd *cobra.Command, args []string) error {
func getUpdateCmdConfig(cmd *cobra.Command) (map[string]interface{}, error) {
config := map[string]interface{}{}

if cmd.Flag("file").Changed {
if cmd.Flag("path").Changed {
fileReader, err := os.Open(update.file)
if err != nil {
return config, err
Expand All @@ -85,16 +78,16 @@ func getUpdateCmdConfig(cmd *cobra.Command) (map[string]interface{}, error) {
return config, err
}
} else {
return config, errors.New("neither file nor string was supplied")
return config, errors.New("neither input nor string was supplied")
}
return config, nil
}

func init() {
RootCmd.AddCommand(updateCmd)

updateCmd.PersistentFlags().StringVarP(&update.file, "file", "f", "", "path to the config file")
updateCmd.MarkFlagFilename("file")
updateCmd.PersistentFlags().StringVarP(&update.file, "path", "p", "", "path to the config file")
updateCmd.MarkFlagFilename("path")
updateCmd.PersistentFlags().StringVarP(&update.configString, "string", "s", "", "JSON configuration string")
updateCmd.PersistentFlags().StringVarP(&update.connector, "connector", "n", "", "name of the target connector")
updateCmd.MarkFlagRequired("connector")
Expand Down
14 changes: 13 additions & 1 deletion lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install:
.PHONY: test-integration
test-integration:
make rundep
go test -v `go list ./... | grep -v /vendor/` -tags=integration
go test -tags=integration -count=1 ./...

.PHONY: rundep
rundep:
Expand All @@ -19,3 +19,15 @@ rundep:
sleep 2; \
done
@echo "up and running"

MOCKERY_PATH := $(shell [ -z "$${GOBIN}" ] && echo $${GOPATH}/bin/mockery || echo $${GOBIN}/mockery; )

.PHONY: update-mocks
update-mocks:
go get github.com/vektra/mockery/...
${MOCKERY_PATH} -inpkg -case "underscore" -recursive -all -note "NOTE: run 'make update-mocks' from this project top folder to update this file and generate new ones."

.PHONY: test-unit
test-unit:
go test -tags=unit ./...

Loading

0 comments on commit 96d47e6

Please sign in to comment.