From 6ceb51bac8ed2965cccb4fb072aece5b17fb07b7 Mon Sep 17 00:00:00 2001 From: Jiashuo Date: Mon, 8 Aug 2022 15:16:56 +0800 Subject: [PATCH] fix(go-client): scan will not recover when encounter `error_invalid_state` (#1106) --- go-client/pegasus/scan_test.go | 78 ++++++++++++++++++++++++++++++---- go-client/pegasus/scanner.go | 8 +++- 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/go-client/pegasus/scan_test.go b/go-client/pegasus/scan_test.go index 171abdc993..d895396338 100644 --- a/go-client/pegasus/scan_test.go +++ b/go-client/pegasus/scan_test.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "reflect" + "strings" "sync" "testing" "time" @@ -219,6 +220,10 @@ func TestPegasusTableConnector_ScanInclusive(t *testing.T) { clearDatabase(t, tb) } +func GetScannerRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.GetScannerRequest) (*rrdb.ScanResponse, error) { + return nil, base.ERR_INVALID_STATE +} + func ScanRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) { return nil, base.ERR_INVALID_STATE } @@ -270,28 +275,83 @@ func TestPegasusTableConnector_ScanFailRecover(t *testing.T) { } assert.Equal(t, 1, successCount) + // test rpc error mockRpcFailedErrorTable, err := client.OpenTable(context.Background(), "temp") assert.Nil(t, err) defer tb.Close() - scanner, _ = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts) - rpcFailedMocked := false + // test getScanner rpc error + scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts) + assert.Nil(t, err) + rpcGetScannerFailedMocked := false + recallGetScanner := true + var getScannerFailedMock *gomonkey.Patches successCount = 0 for { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - complete, _, _, _, error := scanner.Next(ctx) - // mock rpc error, follow request will be recovered automatically - if !rpcFailedMocked { - mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest) - rpcFailedMocked = true + if recallGetScanner && rpcGetScannerFailedMocked { // GetScannerFailedMocked = true, recall GetScanner to trigger the error when execute scanner.Next + scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts) + assert.Nil(t, err) + } + complete, _, _, _, errNext := scanner.Next(ctx) + if !rpcGetScannerFailedMocked { // mock replicaSession.GetScanner rpc error, the next loop request will be failed + getScannerFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "GetScanner", GetScannerRpcErrorForTest) + rpcGetScannerFailedMocked = true + } + cancel() + if complete { + break + } + + if errNext == nil { + successCount++ + continue + } + // error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset + if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") && + strings.Contains(errNext.Error(), "updateConfig=true") { + getScannerFailedMock.Reset() + recallGetScanner = false + } else if strings.Contains(errNext.Error(), "recover after next loop") { + continue } else { - mock.Reset() + break + } + } + // since re-call once getScanner, so the successCount = 100 + 1 + assert.Equal(t, 101, successCount) + + // test scan rpc error + getScannerFailedMock.Reset() + rpcScanFailedMocked := false + var scanFailedMock *gomonkey.Patches + successCount = 0 + scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts) + assert.Nil(t, err) + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*500) + complete, _, _, _, errNext := scanner.Next(ctx) + if !rpcScanFailedMocked { // mock scan rpc error, the next loop request will be failed but recovered automatically + scanFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest) + rpcScanFailedMocked = true } cancel() if complete { break } - if error == nil { + + if errNext == nil { successCount++ + continue + } + + // error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset + if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") && + strings.Contains(errNext.Error(), "updateConfig=true") { + scanFailedMock.Reset() + } else if strings.Contains(errNext.Error(), "recover after next loop") { + continue + } else { + break } } assert.Equal(t, 100, successCount) diff --git a/go-client/pegasus/scanner.go b/go-client/pegasus/scanner.go index d1238c0626..9682994510 100644 --- a/go-client/pegasus/scanner.go +++ b/go-client/pegasus/scanner.go @@ -223,7 +223,13 @@ func (p *pegasusScanner) startScanPartition(ctx context.Context) (completed bool part := p.table.getPartitionByGpid(p.curGpid) response, err := part.GetScanner(ctx, p.curGpid, p.curHash, request) - + if err != nil { + p.batchStatus = batchRpcError + if updateConfig, _, errHandler := p.table.handleReplicaError(err, part); errHandler != nil { + err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler) + } + return + } err = p.onRecvScanResponse(response, err) if err == nil { return p.doNext(ctx)