Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
fix: scanner.next() won't automatically recover when server side not …
Browse files Browse the repository at this point in the history
…return ERR_OK (#86)
  • Loading branch information
foreverneverer authored May 8, 2021
1 parent 50cb769 commit 1c10864
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 22 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/cenkalti/backoff/v4 v4.1.0
github.com/fortytw2/leaktest v1.3.0
github.com/pegasus-kv/thrift v0.13.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw=
github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
3 changes: 2 additions & 1 deletion idl/admin/admin-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/cmd/cmd-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/cmd/cmd.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/radmin/radmin-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/radmin/radmin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/replication/replication-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/replication/replication.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/rrdb/rrdb-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/rrdb/rrdb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions pegasus/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package pegasus
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"

"github.com/XiaoMi/pegasus-go-client/idl/base"
"github.com/XiaoMi/pegasus-go-client/idl/rrdb"
"github.com/XiaoMi/pegasus-go-client/session"
"github.com/agiledragon/gomonkey"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -191,6 +196,85 @@ func TestPegasusTableConnector_ScanInclusive(t *testing.T) {
clearDatabase(t, tb)
}

func ScanRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
return nil, base.ERR_INVALID_STATE
}

func ScanUnknownErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
return &rrdb.ScanResponse{Error: -4}, nil
}

func TestPegasusTableConnector_ScanFailRecover(t *testing.T) {
defer leaktest.Check(t)()

client := NewClient(testingCfg)
defer client.Close()

tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()

for i := 0; i < 100; i++ {
err := tb.Set(context.Background(), []byte("h1"), []byte(fmt.Sprint(i)), []byte("hello world"))
assert.Nil(t, err)
}

opts := NewScanOptions()
opts.BatchSize = 1
var session = &session.ReplicaSession{}
// test unknown error
mockUnknownErrorTable, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
scanner, _ := mockUnknownErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
unknownErrorMocked := false
successCount := 0
var mock *gomonkey.Patches
for i := 0; i < 100; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
_, _, _, _, error := scanner.Next(ctx)
if error == nil {
successCount++
}
// only mock unknown error, all the follow request will be failed
if !unknownErrorMocked {
mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanUnknownErrorForTest)
unknownErrorMocked = true
} else {
mock.Reset()
}
cancel()
}
assert.Equal(t, 1, successCount)

mockRpcFailedErrorTable, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
scanner, _ = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
rpcFailedMocked := false
successCount = 0
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
complete, _, _, _, error := scanner.Next(ctx)
// mock rpc error, follow request will be recovered automatically
if !rpcFailedMocked {
mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
rpcFailedMocked = true
} else {
mock.Reset()
}
cancel()
if complete {
break
}
if error == nil {
successCount++
}
}
assert.Equal(t, 100, successCount)
clearDatabase(t, tb)
}

func TestPegasusTableConnector_ScanWithFilter(t *testing.T) {
defer leaktest.Check(t)()

Expand Down
33 changes: 20 additions & 13 deletions pegasus/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type ScannerOptions struct {
const (
batchScanning = 0
batchScanFinished = -1 // Scanner's batch is finished, clean up it and switch to the status batchEmpty
batchEmpty = -2
batchError = -3
batchEmpty = -2 // scan context has been removed
batchRpcError = -3 // rpc error, include ERR_SESSION_RESET,ERR_OBJECT_NOT_FOUND,ERR_INVALID_STATE, ERR_TIMEOUT
batchUnknownError = -4 // rpc succeed, but operation encounter some unknown error in server side
)

// Scanner defines the interface of client-side scanning.
Expand Down Expand Up @@ -108,8 +109,13 @@ func newPegasusScannerForUnorderedScanners(table *pegasusTableConnector, gpidSli

func (p *pegasusScanner) Next(ctx context.Context) (completed bool, hashKey []byte,
sortKey []byte, value []byte, err error) {
if p.batchStatus == batchError {
err = fmt.Errorf("last Next() failed")
if p.batchStatus == batchUnknownError {
err = fmt.Errorf("last Next() encounter unknow error, please retry after resloving it manually")
return
}
if p.batchStatus == batchRpcError {
err = fmt.Errorf("last Next() encounter rpc error, it may recover after next loop")
p.batchStatus = batchScanning
return
}
if p.closed {
Expand All @@ -129,10 +135,6 @@ func (p *pegasusScanner) Next(ctx context.Context) (completed bool, hashKey []by
return p.doNext(ctx)
}()

if err != nil {
p.batchStatus = batchError
}

err = WrapError(err, OpNext)
return
}
Expand Down Expand Up @@ -212,11 +214,17 @@ func (p *pegasusScanner) nextBatch(ctx context.Context) (completed bool, hashKey
request := &rrdb.ScanRequest{ContextID: p.batchStatus}
part := p.table.getPartitionByGpid(p.curGpid)
response, err := part.Scan(ctx, p.curGpid, request)
if err != nil {
p.batchStatus = batchRpcError
if updateConfig, errHandler := p.table.handleReplicaError(err, part); errHandler != nil {
err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler)
}
return
}
err = p.onRecvScanResponse(response, err)
if err == nil {
return p.doNext(ctx)
}

return
}

Expand All @@ -239,14 +247,13 @@ func (p *pegasusScanner) onRecvScanResponse(response *rrdb.ScanResponse, err err
p.batchStatus = batchEmpty
} else {
// rpc succeed, but operation encounter some error in server side
p.batchStatus = batchUnknownError
return base.NewRocksDBErrFromInt(response.Error)
}
} else {
// rpc failed
return fmt.Errorf("scan failed with error:" + err.Error())
return nil
}

return nil
return fmt.Errorf("scan failed with error:" + err.Error())
}

func (p *pegasusScanner) Close() error {
Expand Down

0 comments on commit 1c10864

Please sign in to comment.