Skip to content

Commit

Permalink
feat: retry by cases and increment retry interval (#252)
Browse files Browse the repository at this point in the history
* feat: retry by cases and increment retry interval

* fix: add comment for E_RAFT_BUFFER_OVERFLOW

Co-authored-by: Yee <[email protected]>
  • Loading branch information
veezhang and yixinglu authored Dec 16, 2022
1 parent 9a04c22 commit 265cadb
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/vesoft-inc/nebula-importer

require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
68 changes: 62 additions & 6 deletions pkg/client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ import (
"strings"
"time"

"github.com/cenkalti/backoff/v4"
nebula "github.com/vesoft-inc/nebula-go/v3"
"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/config"
"github.com/vesoft-inc/nebula-importer/pkg/logger"
)

const (
DefaultRetryInitialInterval = time.Second
DefaultRetryRandomizationFactor = 0.1
DefaultRetryMultiplier = 1.5
DefaultRetryMaxInterval = 2 * time.Minute
DefaultRetryMaxElapsedTime = time.Hour
)

type ClientPool struct {
retry int
concurrency int
Expand Down Expand Up @@ -171,15 +180,62 @@ func (p *ClientPool) startWorker(i int) {

now := time.Now()

var err error = nil
var resp *nebula.ResultSet = nil
for retry := p.retry; retry > 0; retry-- {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = DefaultRetryInitialInterval
exp.RandomizationFactor = DefaultRetryRandomizationFactor
exp.Multiplier = DefaultRetryMultiplier
exp.MaxInterval = DefaultRetryMaxInterval
exp.MaxElapsedTime = DefaultRetryMaxElapsedTime

var (
err error
resp *nebula.ResultSet
retry = p.retry
)

// There are three cases of retry
// * Case 1: retry no more
// * Case 2. retry as much as possible
// * Case 3: retry with limit times
_ = backoff.Retry(func() error {
resp, err = p.Sessions[i].Execute(data.Stmt)
if err == nil && resp.IsSucceed() {
break
return nil
}
time.Sleep(1 * time.Second)
}
retryErr := err
if resp != nil {
errorCode, errorMsg := resp.GetErrorCode(), resp.GetErrorMsg()
retryErr = fmt.Errorf("%d:%s", errorCode, errorMsg)

// Case 1: retry no more
var isPermanentError = true
switch errorCode {
case nebula.ErrorCode_E_SYNTAX_ERROR:
case nebula.ErrorCode_E_SEMANTIC_ERROR:
default:
isPermanentError = false
}
if isPermanentError {
// stop the retry
return backoff.Permanent(retryErr)
}

// Case 2. retry as much as possible
// TODO: compare with E_RAFT_BUFFER_OVERFLOW
// Can not get the E_RAFT_BUFFER_OVERFLOW inside storage now.
if strings.Contains(errorMsg, "raft buffer is full") {
retry = p.retry
return retryErr
}
}
// Case 3: retry with limit times
if retry <= 0 {
// stop the retry
return backoff.Permanent(retryErr)
}
retry--
return retryErr
}, exp)

if err != nil {
err = fmt.Errorf("Client %d fail to execute: %s, Error: %s", i, data.Stmt, err.Error())
Expand Down
12 changes: 6 additions & 6 deletions pkg/picker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ func (c *Config) Build() (Picker, error) {

var converters []Converter

if !nullHandled && c.Nullable != nil {
converters = append(converters, NullableConverter{
Nullable: c.Nullable,
})
}

if c.Nullable != nil {
if !nullHandled {
converters = append(converters, NullableConverter{
Nullable: c.Nullable,
})
}

if c.DefaultValue != nil {
converters = append(converters, DefaultConverter{
Value: *c.DefaultValue,
Expand Down

0 comments on commit 265cadb

Please sign in to comment.