diff --git a/go.mod b/go.mod index 25dedec..ed676b0 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/BurntSushi/toml v0.3.1 // indirect + github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/cenkalti/backoff/v4 v4.1.0 github.com/fortytw2/leaktest v1.3.0 github.com/pegasus-kv/thrift v0.13.0 diff --git a/go.sum b/go.sum index 578e487..4c1b579 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw= +github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw= github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/idl/admin/admin-consts.go b/idl/admin/admin-consts.go index c681065..28041e4 100644 --- a/idl/admin/admin-consts.go +++ b/idl/admin/admin-consts.go @@ -7,9 +7,10 @@ import ( "bytes" "context" "fmt" + "reflect" + "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/pegasus-kv/thrift/lib/go/thrift" - "reflect" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/cmd/cmd-consts.go b/idl/cmd/cmd-consts.go index 1e7e8bd..5fdc03c 100644 --- a/idl/cmd/cmd-consts.go +++ b/idl/cmd/cmd-consts.go @@ -7,8 +7,9 @@ import ( "bytes" "context" "fmt" - "github.com/pegasus-kv/thrift/lib/go/thrift" "reflect" + + "github.com/pegasus-kv/thrift/lib/go/thrift" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/cmd/cmd.go b/idl/cmd/cmd.go index b8541e7..2e09a2d 100644 --- a/idl/cmd/cmd.go +++ b/idl/cmd/cmd.go @@ -7,8 +7,9 @@ import ( "bytes" "context" "fmt" - "github.com/pegasus-kv/thrift/lib/go/thrift" "reflect" + + "github.com/pegasus-kv/thrift/lib/go/thrift" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/radmin/radmin-consts.go b/idl/radmin/radmin-consts.go index 159af7d..301ec77 100644 --- a/idl/radmin/radmin-consts.go +++ b/idl/radmin/radmin-consts.go @@ -7,9 +7,10 @@ import ( "bytes" "context" "fmt" + "reflect" + "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/pegasus-kv/thrift/lib/go/thrift" - "reflect" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/radmin/radmin.go b/idl/radmin/radmin.go index 2a5a0ba..19538a4 100644 --- a/idl/radmin/radmin.go +++ b/idl/radmin/radmin.go @@ -7,9 +7,10 @@ import ( "bytes" "context" "fmt" + "reflect" + "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/pegasus-kv/thrift/lib/go/thrift" - "reflect" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/replication/replication-consts.go b/idl/replication/replication-consts.go index cebd74e..3d31925 100644 --- a/idl/replication/replication-consts.go +++ b/idl/replication/replication-consts.go @@ -7,9 +7,10 @@ import ( "bytes" "context" "fmt" + "reflect" + "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/pegasus-kv/thrift/lib/go/thrift" - "reflect" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/replication/replication.go b/idl/replication/replication.go index 1e0df68..bd25ba5 100644 --- a/idl/replication/replication.go +++ b/idl/replication/replication.go @@ -7,9 +7,10 @@ import ( "bytes" "context" "fmt" + "reflect" + "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/pegasus-kv/thrift/lib/go/thrift" - "reflect" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/rrdb/rrdb-consts.go b/idl/rrdb/rrdb-consts.go index 7e08a4e..8d73174 100644 --- a/idl/rrdb/rrdb-consts.go +++ b/idl/rrdb/rrdb-consts.go @@ -7,10 +7,11 @@ import ( "bytes" "context" "fmt" + "reflect" + "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/XiaoMi/pegasus-go-client/idl/replication" "github.com/pegasus-kv/thrift/lib/go/thrift" - "reflect" ) // (needed to ensure safety because of naive import list construction.) diff --git a/idl/rrdb/rrdb.go b/idl/rrdb/rrdb.go index 89ea94d..a548fa3 100644 --- a/idl/rrdb/rrdb.go +++ b/idl/rrdb/rrdb.go @@ -9,10 +9,11 @@ import ( "database/sql/driver" "errors" "fmt" + "reflect" + "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/XiaoMi/pegasus-go-client/idl/replication" "github.com/pegasus-kv/thrift/lib/go/thrift" - "reflect" ) // (needed to ensure safety because of naive import list construction.) diff --git a/pegasus/scan_test.go b/pegasus/scan_test.go index 68c3993..6038d36 100644 --- a/pegasus/scan_test.go +++ b/pegasus/scan_test.go @@ -3,10 +3,15 @@ package pegasus import ( "context" "fmt" + "reflect" "sync" "testing" "time" + "github.com/XiaoMi/pegasus-go-client/idl/base" + "github.com/XiaoMi/pegasus-go-client/idl/rrdb" + "github.com/XiaoMi/pegasus-go-client/session" + "github.com/agiledragon/gomonkey" "github.com/fortytw2/leaktest" "github.com/stretchr/testify/assert" ) @@ -191,6 +196,85 @@ func TestPegasusTableConnector_ScanInclusive(t *testing.T) { clearDatabase(t, tb) } +func ScanRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) { + return nil, base.ERR_INVALID_STATE +} + +func ScanUnknownErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) { + return &rrdb.ScanResponse{Error: -4}, nil +} + +func TestPegasusTableConnector_ScanFailRecover(t *testing.T) { + defer leaktest.Check(t)() + + client := NewClient(testingCfg) + defer client.Close() + + tb, err := client.OpenTable(context.Background(), "temp") + assert.Nil(t, err) + defer tb.Close() + + for i := 0; i < 100; i++ { + err := tb.Set(context.Background(), []byte("h1"), []byte(fmt.Sprint(i)), []byte("hello world")) + assert.Nil(t, err) + } + + opts := NewScanOptions() + opts.BatchSize = 1 + var session = &session.ReplicaSession{} + // test unknown error + mockUnknownErrorTable, err := client.OpenTable(context.Background(), "temp") + assert.Nil(t, err) + defer tb.Close() + scanner, _ := mockUnknownErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts) + unknownErrorMocked := false + successCount := 0 + var mock *gomonkey.Patches + for i := 0; i < 100; i++ { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + _, _, _, _, error := scanner.Next(ctx) + if error == nil { + successCount++ + } + // only mock unknown error, all the follow request will be failed + if !unknownErrorMocked { + mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanUnknownErrorForTest) + unknownErrorMocked = true + } else { + mock.Reset() + } + cancel() + } + assert.Equal(t, 1, successCount) + + mockRpcFailedErrorTable, err := client.OpenTable(context.Background(), "temp") + assert.Nil(t, err) + defer tb.Close() + scanner, _ = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts) + rpcFailedMocked := false + successCount = 0 + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + complete, _, _, _, error := scanner.Next(ctx) + // mock rpc error, follow request will be recovered automatically + if !rpcFailedMocked { + mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest) + rpcFailedMocked = true + } else { + mock.Reset() + } + cancel() + if complete { + break + } + if error == nil { + successCount++ + } + } + assert.Equal(t, 100, successCount) + clearDatabase(t, tb) +} + func TestPegasusTableConnector_ScanWithFilter(t *testing.T) { defer leaktest.Check(t)() diff --git a/pegasus/scanner.go b/pegasus/scanner.go index 9faa1b6..e707f74 100644 --- a/pegasus/scanner.go +++ b/pegasus/scanner.go @@ -29,8 +29,9 @@ type ScannerOptions struct { const ( batchScanning = 0 batchScanFinished = -1 // Scanner's batch is finished, clean up it and switch to the status batchEmpty - batchEmpty = -2 - batchError = -3 + batchEmpty = -2 // scan context has been removed + batchRpcError = -3 // rpc error, include ERR_SESSION_RESET,ERR_OBJECT_NOT_FOUND,ERR_INVALID_STATE, ERR_TIMEOUT + batchUnknownError = -4 // rpc succeed, but operation encounter some unknown error in server side ) // Scanner defines the interface of client-side scanning. @@ -108,8 +109,13 @@ func newPegasusScannerForUnorderedScanners(table *pegasusTableConnector, gpidSli func (p *pegasusScanner) Next(ctx context.Context) (completed bool, hashKey []byte, sortKey []byte, value []byte, err error) { - if p.batchStatus == batchError { - err = fmt.Errorf("last Next() failed") + if p.batchStatus == batchUnknownError { + err = fmt.Errorf("last Next() encounter unknow error, please retry after resloving it manually") + return + } + if p.batchStatus == batchRpcError { + err = fmt.Errorf("last Next() encounter rpc error, it may recover after next loop") + p.batchStatus = batchScanning return } if p.closed { @@ -129,10 +135,6 @@ func (p *pegasusScanner) Next(ctx context.Context) (completed bool, hashKey []by return p.doNext(ctx) }() - if err != nil { - p.batchStatus = batchError - } - err = WrapError(err, OpNext) return } @@ -212,11 +214,17 @@ func (p *pegasusScanner) nextBatch(ctx context.Context) (completed bool, hashKey request := &rrdb.ScanRequest{ContextID: p.batchStatus} part := p.table.getPartitionByGpid(p.curGpid) response, err := part.Scan(ctx, p.curGpid, request) + if err != nil { + p.batchStatus = batchRpcError + if updateConfig, errHandler := p.table.handleReplicaError(err, part); errHandler != nil { + err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler) + } + return + } err = p.onRecvScanResponse(response, err) if err == nil { return p.doNext(ctx) } - return } @@ -239,14 +247,13 @@ func (p *pegasusScanner) onRecvScanResponse(response *rrdb.ScanResponse, err err p.batchStatus = batchEmpty } else { // rpc succeed, but operation encounter some error in server side + p.batchStatus = batchUnknownError return base.NewRocksDBErrFromInt(response.Error) } - } else { - // rpc failed - return fmt.Errorf("scan failed with error:" + err.Error()) + return nil } - return nil + return fmt.Errorf("scan failed with error:" + err.Error()) } func (p *pegasusScanner) Close() error {