Skip to content

Commit

Permalink
fix few bugs for multiple deployment (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancoisPoinsot authored Jan 31, 2019
1 parent 96d47e6 commit a3b9cb6
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 32 deletions.
51 changes: 49 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion cli/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func RunEDeploy(cmd *cobra.Command, args []string) error {
return err
}

return getClient().DeployMultipleConnector(configs)
client := getClient()
client.SetParallelism(parallel)

return client.DeployMultipleConnector(configs)
}

func init() {
Expand All @@ -42,4 +45,5 @@ func init() {
deployCmd.PersistentFlags().StringVarP(&filePath, "path", "p", "", "path to the config file or folder")
deployCmd.MarkFlagFilename("path")
deployCmd.PersistentFlags().StringVarP(&configString, "string", "s", "", "JSON configuration string")
deployCmd.PersistentFlags().IntVarP(&parallel, "parallel", "r", 3, "limit of parallel call to kafka-connect")
}
1 change: 1 addition & 0 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
tasks bool
verbose bool
SSLInsecure bool
parallel int
)

var RootCmd = &cobra.Command{
Expand Down
60 changes: 32 additions & 28 deletions lib/connectors/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectors
import (
"crypto/tls"
"fmt"
"github.com/pkg/errors"
"gopkg.in/resty.v1"
"strconv"
"time"
Expand Down Expand Up @@ -56,8 +57,10 @@ func newBaseClient(url string) BaseClient {
SetError(ErrorResponse{}).
SetHostURL(url).
SetHeader("Accept", "application/json").
SetRetryCount(3).
SetTimeout(5 * time.Second).
SetRetryCount(5).
SetRetryWaitTime(500 * time.Millisecond).
SetRetryMaxWaitTime(5 * time.Second).
SetTimeout(10 * time.Second).
AddRetryCondition(func(resp *resty.Response) (bool, error) {
return resp.StatusCode() == 409, nil
})
Expand Down Expand Up @@ -124,8 +127,8 @@ func (c *baseClient) GetAll() (GetAllConnectorsResponse, error) {
if err != nil {
return GetAllConnectorsResponse{}, err
}
if resp.Error() != nil {
return GetAllConnectorsResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return GetAllConnectorsResponse{}, errors.Errorf("Get all connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -145,8 +148,9 @@ func (c *baseClient) GetConnector(req ConnectorRequest) (ConnectorResponse, erro
if err != nil {
return ConnectorResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return ConnectorResponse{}, resp.Error().(*ErrorResponse)

if resp.StatusCode() >= 400 && resp.StatusCode() != 404 {
return ConnectorResponse{}, errors.Errorf("Get connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -164,8 +168,8 @@ func (c *baseClient) CreateConnector(req CreateConnectorRequest) (ConnectorRespo
if err != nil {
return ConnectorResponse{}, err
}
if resp.Error() != nil {
return ConnectorResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return ConnectorResponse{}, errors.Errorf("Create connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -185,8 +189,8 @@ func (c *baseClient) UpdateConnector(req CreateConnectorRequest) (ConnectorRespo
if err != nil {
return ConnectorResponse{}, err
}
if resp.Error() != nil {
return ConnectorResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return ConnectorResponse{}, errors.Errorf("Update connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -205,8 +209,8 @@ func (c *baseClient) DeleteConnector(req ConnectorRequest) (EmptyResponse, error
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return EmptyResponse{}, errors.Errorf("Delete connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -226,8 +230,8 @@ func (c *baseClient) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfi
if err != nil {
return GetConnectorConfigResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return GetConnectorConfigResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 && resp.StatusCode() != 404 {
return GetConnectorConfigResponse{}, errors.Errorf("Get connector config : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -246,8 +250,8 @@ func (c *baseClient) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatu
if err != nil {
return GetConnectorStatusResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return GetConnectorStatusResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 && resp.StatusCode() != 404 {
return GetConnectorStatusResponse{}, errors.Errorf("Get connector status : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -265,8 +269,8 @@ func (c *baseClient) RestartConnector(req ConnectorRequest) (EmptyResponse, erro
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return EmptyResponse{}, errors.Errorf("Restart connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -285,8 +289,8 @@ func (c *baseClient) PauseConnector(req ConnectorRequest) (EmptyResponse, error)
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return EmptyResponse{}, errors.Errorf("Pause connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -306,8 +310,8 @@ func (c *baseClient) ResumeConnector(req ConnectorRequest) (EmptyResponse, error
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return EmptyResponse{}, errors.Errorf("Resume connector : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand Down Expand Up @@ -366,8 +370,8 @@ func (c *baseClient) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, err
if err != nil {
return GetAllTasksResponse{}, err
}
if resp.Error() != nil {
return GetAllTasksResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return GetAllTasksResponse{}, errors.Errorf("Get all tasks : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -385,8 +389,8 @@ func (c *baseClient) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error)
if err != nil {
return TaskStatusResponse{}, err
}
if resp.Error() != nil && resp.StatusCode() != 404 {
return TaskStatusResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 && resp.StatusCode() != 404 {
return TaskStatusResponse{}, errors.Errorf("Get task status : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand All @@ -405,8 +409,8 @@ func (c *baseClient) RestartTask(req TaskRequest) (EmptyResponse, error) {
if err != nil {
return EmptyResponse{}, err
}
if resp.Error() != nil {
return EmptyResponse{}, resp.Error().(*ErrorResponse)
if resp.StatusCode() >= 400 {
return EmptyResponse{}, errors.Errorf("Restart task : %v", resp.String())
}

result.Code = resp.StatusCode()
Expand Down
46 changes: 46 additions & 0 deletions lib/connectors/base_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package connectors

import (
"github.com/stretchr/testify/assert"
"gopkg.in/jarcoal/httpmock.v1"
"net/http"
"testing"
)

// This test a side effect of resty: when retry on 409 the error of response is not reinitialized
func Test_Get(t *testing.T) {

client := newBaseClient("http://randomurl")
// mock HTTP response
{
typedClient := client.(*baseClient)
httpmock.Reset()
httpmock.ActivateNonDefault(typedClient.restClient.GetClient())
defer httpmock.DeactivateAndReset()

i := 0
myresponder := func(req *http.Request) (*http.Response, error) {
i++
if i == 1 {
jsonresp, _ := httpmock.NewJsonResponse(409, ErrorResponse{Message: "some random msg"})
res := new(http.Response)
*res = *jsonresp
res.Request = req
return res, nil
}
jsonresp, _ := httpmock.NewJsonResponse(200, ConnectorResponse{Name: "test"})
res := new(http.Response)
*res = *jsonresp
res.Request = req
return res, nil
}

httpmock.RegisterResponder("GET", "http://randomurl/connectors/test", myresponder)
}

//Act
connector, err := client.GetConnector(ConnectorRequest{Name: "test"})

assert.Equal(t, "test", connector.Name)
assert.NoError(t, err)
}
9 changes: 8 additions & 1 deletion lib/connectors/highlevel_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type HighLevelClient interface {
DeployMultipleConnector(connectors []CreateConnectorRequest) (err error)
SetInsecureSSL()
SetDebug()
SetParallelism(value int)
}

type highLevelClient struct {
Expand All @@ -44,6 +45,12 @@ func NewClient(url string) HighLevelClient {
return &highLevelClient{client: newBaseClient(url), maxParallelRequest: 3}
}

//Set the limit of parallel call to kafka-connect server
//Default to 3
func (c *highLevelClient) SetParallelism(value int) {
c.maxParallelRequest = value
}

func (c *highLevelClient) SetInsecureSSL() {
c.client.SetInsecureSSL()
}
Expand Down Expand Up @@ -302,7 +309,7 @@ func (c *highLevelClient) DeployMultipleConnector(connectors []CreateConnectorRe
if newErr != nil {
errSync.Lock()
defer errSync.Unlock()
multierror.Append(err, newErr)
err = multierror.Append(err, errors.Wrapf(newErr, "error while deploying: %v", req.Name))
}
}(connector)
}
Expand Down
21 changes: 21 additions & 0 deletions lib/connectors/highlevel_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package connectors

import (
"github.com/pkg/errors"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -190,3 +191,23 @@ func Test_DeployMultipleConnector_Ok(t *testing.T) {
assert.Equal(t, map[string]interface{}{"test1": true, "test2": true, "test3": true, "test4": true, "test5": true}, received)
assert.NoError(t, err)
}

func Test_DeployMultipleConnector_Error(t *testing.T) {
client := &highLevelClient{client: &MockBaseClient{}, maxParallelRequest: 2}

// Don't want to mock every baseClient call, so I am going the lazy way.
patch := monkey.PatchInstanceMethod(reflect.TypeOf(client), "DeployConnector", func(_ *highLevelClient, req CreateConnectorRequest) (err error) {
return errors.New("random error")
})
defer patch.Restore()

err := client.DeployMultipleConnector([]CreateConnectorRequest{
{ConnectorRequest: ConnectorRequest{Name: "test1"}},
{ConnectorRequest: ConnectorRequest{Name: "test2"}},
{ConnectorRequest: ConnectorRequest{Name: "test3"}},
{ConnectorRequest: ConnectorRequest{Name: "test4"}},
{ConnectorRequest: ConnectorRequest{Name: "test5"}},
})

assert.Error(t, err)
}

0 comments on commit a3b9cb6

Please sign in to comment.