diff --git a/Gopkg.lock b/Gopkg.lock index 0b4f2e9..a466bed 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,52 +2,75 @@ [[projects]] + digest = "1:a2c1d0e43bd3baaa071d1b9ed72c27d78169b2b269f71c105ac4ba34b1be4a39" name = "github.com/davecgh/go-spew" packages = ["spew"] + pruneopts = "UT" revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" [[projects]] + digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" name = "github.com/inconshreveable/mousetrap" packages = ["."] + pruneopts = "UT" revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" [[projects]] + digest = "1:cf31692c14422fa27c83a05292eb5cbe0fb2775972e8f1f8446a71549bd8980b" + name = "github.com/pkg/errors" + packages = ["."] + pruneopts = "UT" + revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4" + version = "v0.8.1" + +[[projects]] + digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe" name = "github.com/pmezard/go-difflib" packages = ["difflib"] + pruneopts = "UT" revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" [[projects]] + digest = "1:7ffc0983035bc7e297da3688d9fe19d60a420e9c38bef23f845c53788ed6a05e" name = "github.com/spf13/cobra" packages = ["."] + pruneopts = "UT" revision = "7b2c5ac9fc04fc5efafb60700713d4fa609b777b" version = "v0.0.1" [[projects]] + digest = "1:1b21a2b4058a779f290c7341cd93267492e0ecea6c8b54f64a4a5fd7ff131034" name = "github.com/spf13/pflag" packages = ["."] + pruneopts = "UT" revision = "e57e3eeb33f795204c1ca35f56c44f83227c6e66" version = "v1.0.0" [[projects]] + digest = "1:25f2747b063c0a656195ef85413cef8c9f2bbe128deab7d39563a6ca1e536070" name = "github.com/stretchr/testify" packages = ["assert"] + pruneopts = "UT" revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0" version = "v1.1.4" [[projects]] branch = "master" + digest = "1:a412caf33b5d3c7938c28e1e745c10664bd180277b73ab3b34c0c467886f9969" name = "golang.org/x/net" packages = [ "idna", - "publicsuffix" + "publicsuffix", ] + pruneopts = "UT" revision = "8a410e7b638dca158bf9e766925842f6651ff828" [[projects]] branch = "master" + digest = "1:d42da4dfab111ddef8bfe03540cfaa9060137ae2ed995c7dfdaf03bb5cff54a7" name = "golang.org/x/text" packages = [ "collate", @@ -63,19 +86,27 @@ "unicode/bidi", "unicode/cldr", "unicode/norm", - "unicode/rangetable" + "unicode/rangetable", ] + pruneopts = "UT" revision = "88f656faf3f37f690df1a32515b479415e1a6769" [[projects]] + digest = "1:ce431d26276fb2227ae64ad2f7d980e8da9126c93979d9c83f413c36215e7031" name = "gopkg.in/resty.v1" packages = ["."] - revision = "d4920dcf5b7689548a6db640278a9b35a5b48ec6" - version = "v1.9.1" + pruneopts = "UT" + revision = "03c09fa32a21b7b27b8dbb3877826c1ab3d2daa2" + version = "v1.11.0" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "fa561d577c2c2abaad1a0302f527405fe8f3fbea06637fd9f4ca19603cfb7d71" + input-imports = [ + "github.com/pkg/errors", + "github.com/spf13/cobra", + "github.com/stretchr/testify/assert", + "gopkg.in/resty.v1", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 196c706..500b1a6 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -37,6 +37,11 @@ name = "github.com/stretchr/testify" version = "1.1.4" + +[[constraint]] + name = "gopkg.in/resty.v1" + version = "1.11.0" + [prune] go-tests = true unused-packages = true diff --git a/lib/connectors/client.go b/lib/connectors/client.go index e07165a..78cdfa5 100644 --- a/lib/connectors/client.go +++ b/lib/connectors/client.go @@ -2,8 +2,6 @@ package connectors import ( "crypto/tls" - "encoding/json" - "errors" "fmt" "gopkg.in/resty.v1" "time" @@ -27,24 +25,14 @@ func (err ErrorResponse) Error() string { //NewClient generates a new client func NewClient(url string) *Client { restClient := resty.New(). - OnAfterResponse(func(c *resty.Client, res *resty.Response) error { - // The default error handling given by `SetRESTMode` is a bit weak. This is the override - - if res.StatusCode() >= 400 && res.StatusCode() != 404 { - restErr := ErrorResponse{} - decodeErr := json.Unmarshal(res.Body(), &restErr) - if decodeErr != nil { - return restErr - } - return errors.New(fmt.Sprintf("Error while decoding body while error: %v", res.Body())) - } - return nil - }). - SetRESTMode(). + SetError(ErrorResponse{}). SetHostURL(url). SetHeader("Accept", "application/json"). SetRetryCount(3). - SetTimeout(5 * time.Second) + SetTimeout(5 * time.Second). + AddRetryCondition(func(resp *resty.Response) (bool, error) { + return resp.StatusCode() == 409, nil + }) return &Client{restClient: restClient} } diff --git a/lib/connectors/connector_integration_test.go b/lib/connectors/connector_integration_test.go index 131ea33..b16af78 100644 --- a/lib/connectors/connector_integration_test.go +++ b/lib/connectors/connector_integration_test.go @@ -28,7 +28,6 @@ func TestCreateConnector(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-create-connector"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, @@ -40,6 +39,23 @@ func TestCreateConnector(t *testing.T) { assert.Equal(t, 201, resp.Code) } +func TestErrorCode(t *testing.T) { + client := NewClient(hostConnect) + _, err := client.CreateConnector( + CreateConnectorRequest{ + ConnectorRequest: ConnectorRequest{Name: "not-a-valid-connector"}, + Config: map[string]interface{}{ + "connector.class": "not a valid connector class", + "file": testFile, + "topic": "connect-test", + }, + }, + true, + ) + + assert.Error(t, err) +} + func TestGetConnector(t *testing.T) { client := NewClient(hostConnect) _, err := client.CreateConnector( @@ -47,7 +63,6 @@ func TestGetConnector(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-get-connector"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, @@ -75,7 +90,6 @@ func TestGetAllConnectors(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-get-all-connectors"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, @@ -166,7 +180,6 @@ func TestDeleteConnector(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-delete-connectors"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, @@ -262,7 +275,6 @@ func TestGetConnectorStatus(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-get-connector-status"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, @@ -289,7 +301,6 @@ func TestRestartConnector(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-restart-connector"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, @@ -314,7 +325,6 @@ func TestPauseAndResumeConnector(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-pause-and-resume-connector"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, @@ -354,7 +364,6 @@ func TestRestartTask(t *testing.T) { ConnectorRequest: ConnectorRequest{Name: "test-restart-task"}, Config: map[string]interface{}{ "connector.class": "FileStreamSource", - "tasks.max": "1", "file": testFile, "topic": "connect-test", }, diff --git a/lib/connectors/connectors.go b/lib/connectors/connectors.go index 7ac4409..df9e823 100644 --- a/lib/connectors/connectors.go +++ b/lib/connectors/connectors.go @@ -64,6 +64,9 @@ func (c *Client) GetAll() (GetAllConnectorsResponse, error) { if err != nil { return GetAllConnectorsResponse{}, err } + if resp.Error() != nil { + return GetAllConnectorsResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() result.Connectors = connectors @@ -82,6 +85,9 @@ func (c Client) GetConnector(req ConnectorRequest) (ConnectorResponse, error) { if err != nil { return ConnectorResponse{}, err } + if resp.Error() != nil && resp.StatusCode() != 404 { + return ConnectorResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() return result, nil @@ -98,6 +104,9 @@ func (c *Client) CreateConnector(req CreateConnectorRequest, sync bool) (Connect if err != nil { return ConnectorResponse{}, err } + if resp.Error() != nil { + return ConnectorResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() @@ -128,6 +137,9 @@ func (c Client) UpdateConnector(req CreateConnectorRequest, sync bool) (Connecto if err != nil { return ConnectorResponse{}, err } + if resp.Error() != nil { + return ConnectorResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() @@ -157,6 +169,9 @@ func (c Client) DeleteConnector(req ConnectorRequest, sync bool) (EmptyResponse, if err != nil { return EmptyResponse{}, err } + if resp.Error() != nil { + return EmptyResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() @@ -187,6 +202,9 @@ func (c Client) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResp if err != nil { return GetConnectorConfigResponse{}, err } + if resp.Error() != nil && resp.StatusCode() != 404 { + return GetConnectorConfigResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() result.Config = config @@ -204,6 +222,9 @@ func (c Client) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResp if err != nil { return GetConnectorStatusResponse{}, err } + if resp.Error() != nil && resp.StatusCode() != 404 { + return GetConnectorStatusResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() return result, nil @@ -220,6 +241,9 @@ func (c Client) RestartConnector(req ConnectorRequest) (EmptyResponse, error) { if err != nil { return EmptyResponse{}, err } + if resp.Error() != nil { + return EmptyResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() return result, nil @@ -237,6 +261,9 @@ func (c Client) PauseConnector(req ConnectorRequest, sync bool) (EmptyResponse, if err != nil { return EmptyResponse{}, err } + if resp.Error() != nil { + return EmptyResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() @@ -266,6 +293,9 @@ func (c Client) ResumeConnector(req ConnectorRequest, sync bool) (EmptyResponse, if err != nil { return EmptyResponse{}, err } + if resp.Error() != nil { + return EmptyResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() @@ -322,7 +352,8 @@ func convertConfigValueToString(value interface{}) string { } } -//TryUntil repeats the request +// TryUntil repeats exec until it return true or timeout is reached +// TryUntil itself return true if `exec` has return true (success), false if timeout (failure) func TryUntil(exec func() bool, limit time.Duration) bool { timeLimit := time.After(limit) diff --git a/lib/connectors/tasks.go b/lib/connectors/tasks.go index e67f063..8643a82 100644 --- a/lib/connectors/tasks.go +++ b/lib/connectors/tasks.go @@ -53,6 +53,9 @@ func (c Client) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) { if err != nil { return GetAllTasksResponse{}, err } + if resp.Error() != nil { + return GetAllTasksResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() return result, nil @@ -69,6 +72,9 @@ func (c Client) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) { if err != nil { return TaskStatusResponse{}, err } + if resp.Error() != nil && resp.StatusCode() != 404 { + return TaskStatusResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() @@ -86,6 +92,9 @@ func (c Client) RestartTask(req TaskRequest) (EmptyResponse, error) { if err != nil { return EmptyResponse{}, err } + if resp.Error() != nil { + return EmptyResponse{}, resp.Error().(*ErrorResponse) + } result.Code = resp.StatusCode() diff --git a/lib/docker-compose.yml b/lib/docker-compose.yml index 157d4dd..57ab1b5 100644 --- a/lib/docker-compose.yml +++ b/lib/docker-compose.yml @@ -16,4 +16,6 @@ services: - DISABLE_JMX=1 - DEBUG=1 - SUPERVISORWEB=0 - - CONNECTORS=file \ No newline at end of file + - CONNECTORS=file + +# you will find config in '/var/run/{SERVICE_NAME}' folder inside container \ No newline at end of file