Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

fix: scanner.next() won't automatically recover when server side not return ERR_OK #86

Merged
merged 8 commits into from
May 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/cenkalti/backoff/v4 v4.1.0
github.com/fortytw2/leaktest v1.3.0
github.com/pegasus-kv/thrift v0.13.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw=
github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
3 changes: 2 additions & 1 deletion idl/admin/admin-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/cmd/cmd-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/cmd/cmd.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/radmin/radmin-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/radmin/radmin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/replication/replication-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/replication/replication.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/rrdb/rrdb-consts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion idl/rrdb/rrdb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions pegasus/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package pegasus
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"

"github.com/XiaoMi/pegasus-go-client/idl/base"
"github.com/XiaoMi/pegasus-go-client/idl/rrdb"
"github.com/XiaoMi/pegasus-go-client/session"
"github.com/agiledragon/gomonkey"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -191,6 +196,85 @@ func TestPegasusTableConnector_ScanInclusive(t *testing.T) {
clearDatabase(t, tb)
}

func ScanRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
return nil, base.ERR_INVALID_STATE
}

func ScanUnknownErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
return &rrdb.ScanResponse{Error: -4}, nil
}

func TestPegasusTableConnector_ScanFailRecover(t *testing.T) {
defer leaktest.Check(t)()

client := NewClient(testingCfg)
defer client.Close()

tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()

for i := 0; i < 100; i++ {
err := tb.Set(context.Background(), []byte("h1"), []byte(fmt.Sprint(i)), []byte("hello world"))
assert.Nil(t, err)
}

opts := NewScanOptions()
opts.BatchSize = 1
var session = &session.ReplicaSession{}
// test unknown error
mockUnknownErrorTable, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
scanner, _ := mockUnknownErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
unknownErrorMocked := false
successCount := 0
var mock *gomonkey.Patches
for i := 0; i < 100; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
_, _, _, _, error := scanner.Next(ctx)
if error == nil {
successCount++
}
// only mock unknown error, all the follow request will be failed
if !unknownErrorMocked {
mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanUnknownErrorForTest)
unknownErrorMocked = true
} else {
mock.Reset()
}
cancel()
}
assert.Equal(t, 1, successCount)

mockRpcFailedErrorTable, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
scanner, _ = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
rpcFailedMocked := false
successCount = 0
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
complete, _, _, _, error := scanner.Next(ctx)
// mock rpc error, follow request will be recovered automatically
if !rpcFailedMocked {
mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
rpcFailedMocked = true
} else {
mock.Reset()
}
cancel()
if complete {
break
}
if error == nil {
successCount++
}
}
assert.Equal(t, 100, successCount)
clearDatabase(t, tb)
}

func TestPegasusTableConnector_ScanWithFilter(t *testing.T) {
defer leaktest.Check(t)()

Expand Down
33 changes: 20 additions & 13 deletions pegasus/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type ScannerOptions struct {
const (
batchScanning = 0
batchScanFinished = -1 // Scanner's batch is finished, clean up it and switch to the status batchEmpty
batchEmpty = -2
batchError = -3
batchEmpty = -2 // scan context has been removed
batchRpcError = -3 // rpc error, include ERR_SESSION_RESET,ERR_OBJECT_NOT_FOUND,ERR_INVALID_STATE, ERR_TIMEOUT
batchUnknownError = -4 // rpc succeed, but operation encounter some unknown error in server side
)

// Scanner defines the interface of client-side scanning.
Expand Down Expand Up @@ -108,8 +109,13 @@ func newPegasusScannerForUnorderedScanners(table *pegasusTableConnector, gpidSli

func (p *pegasusScanner) Next(ctx context.Context) (completed bool, hashKey []byte,
sortKey []byte, value []byte, err error) {
if p.batchStatus == batchError {
err = fmt.Errorf("last Next() failed")
if p.batchStatus == batchUnknownError {
err = fmt.Errorf("last Next() encounter unknow error, please retry after resloving it manually")
return
}
if p.batchStatus == batchRpcError {
err = fmt.Errorf("last Next() encounter rpc error, it may recover after next loop")
p.batchStatus = batchScanning
return
}
if p.closed {
Expand All @@ -129,10 +135,6 @@ func (p *pegasusScanner) Next(ctx context.Context) (completed bool, hashKey []by
return p.doNext(ctx)
}()

if err != nil {
p.batchStatus = batchError
}

err = WrapError(err, OpNext)
return
}
Expand Down Expand Up @@ -212,11 +214,17 @@ func (p *pegasusScanner) nextBatch(ctx context.Context) (completed bool, hashKey
request := &rrdb.ScanRequest{ContextID: p.batchStatus}
part := p.table.getPartitionByGpid(p.curGpid)
response, err := part.Scan(ctx, p.curGpid, request)
if err != nil {
p.batchStatus = batchRpcError
if updateConfig, errHandler := p.table.handleReplicaError(err, part); errHandler != nil {
err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler)
}
return
}
err = p.onRecvScanResponse(response, err)
if err == nil {
return p.doNext(ctx)
}

return
}

Expand All @@ -239,14 +247,13 @@ func (p *pegasusScanner) onRecvScanResponse(response *rrdb.ScanResponse, err err
p.batchStatus = batchEmpty
} else {
// rpc succeed, but operation encounter some error in server side
p.batchStatus = batchUnknownError
return base.NewRocksDBErrFromInt(response.Error)
}
} else {
// rpc failed
return fmt.Errorf("scan failed with error:" + err.Error())
return nil
}

return nil
return fmt.Errorf("scan failed with error:" + err.Error())
}

func (p *pegasusScanner) Close() error {
Expand Down