diff --git a/go.mod b/go.mod index bd337ce6..2f0e0982 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/vesoft-inc/nebula-importer require ( + github.com/cenkalti/backoff/v4 v4.1.3 github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index fa51b84a..66a36655 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/pkg/client/clientpool.go b/pkg/client/clientpool.go index 3290d86c..59664a78 100644 --- a/pkg/client/clientpool.go +++ b/pkg/client/clientpool.go @@ -6,12 +6,21 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" nebula "github.com/vesoft-inc/nebula-go/v3" "github.com/vesoft-inc/nebula-importer/pkg/base" "github.com/vesoft-inc/nebula-importer/pkg/config" "github.com/vesoft-inc/nebula-importer/pkg/logger" ) +const ( + DefaultRetryInitialInterval = time.Second + DefaultRetryRandomizationFactor = 0.1 + DefaultRetryMultiplier = 1.5 + DefaultRetryMaxInterval = 2 * time.Minute + DefaultRetryMaxElapsedTime = time.Hour +) + type ClientPool struct { retry int concurrency int @@ -171,15 +180,62 @@ func (p *ClientPool) startWorker(i int) { now := time.Now() - var err error = nil - var resp *nebula.ResultSet = nil - for retry := p.retry; retry > 0; retry-- { + exp := backoff.NewExponentialBackOff() + exp.InitialInterval = DefaultRetryInitialInterval + exp.RandomizationFactor = DefaultRetryRandomizationFactor + exp.Multiplier = DefaultRetryMultiplier + exp.MaxInterval = DefaultRetryMaxInterval + exp.MaxElapsedTime = DefaultRetryMaxElapsedTime + + var ( + err error + resp *nebula.ResultSet + retry = p.retry + ) + + // There are three cases of retry + // * Case 1: retry no more + // * Case 2. retry as much as possible + // * Case 3: retry with limit times + _ = backoff.Retry(func() error { resp, err = p.Sessions[i].Execute(data.Stmt) if err == nil && resp.IsSucceed() { - break + return nil } - time.Sleep(1 * time.Second) - } + retryErr := err + if resp != nil { + errorCode, errorMsg := resp.GetErrorCode(), resp.GetErrorMsg() + retryErr = fmt.Errorf("%d:%s", errorCode, errorMsg) + + // Case 1: retry no more + var isPermanentError = true + switch errorCode { + case nebula.ErrorCode_E_SYNTAX_ERROR: + case nebula.ErrorCode_E_SEMANTIC_ERROR: + default: + isPermanentError = false + } + if isPermanentError { + // stop the retry + return backoff.Permanent(retryErr) + } + + // Case 2. retry as much as possible + // TODO: compare with E_RAFT_BUFFER_OVERFLOW + // Can not get the E_RAFT_BUFFER_OVERFLOW inside storage now. + if strings.Contains(errorMsg, "raft buffer is full") { + retry = p.retry + return retryErr + } + } + // Case 3: retry with limit times + if retry <= 0 { + // stop the retry + return backoff.Permanent(retryErr) + } + retry-- + return retryErr + }, exp) if err != nil { err = fmt.Errorf("Client %d fail to execute: %s, Error: %s", i, data.Stmt, err.Error()) diff --git a/pkg/picker/config.go b/pkg/picker/config.go index 20134515..05262809 100644 --- a/pkg/picker/config.go +++ b/pkg/picker/config.go @@ -60,13 +60,13 @@ func (c *Config) Build() (Picker, error) { var converters []Converter - if !nullHandled && c.Nullable != nil { - converters = append(converters, NullableConverter{ - Nullable: c.Nullable, - }) - } - if c.Nullable != nil { + if !nullHandled { + converters = append(converters, NullableConverter{ + Nullable: c.Nullable, + }) + } + if c.DefaultValue != nil { converters = append(converters, DefaultConverter{ Value: *c.DefaultValue,