Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3612
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
maxshuang authored and ti-chi-bot committed Nov 29, 2021
1 parent 17d62e2 commit ae7527a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
40 changes: 13 additions & 27 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,6 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
}
}

// partialClone clones part fields of singleRegionInfo, this is used when error
// happens, kv client needs to recover region request from singleRegionInfo
func (s *singleRegionInfo) partialClone() singleRegionInfo {
sri := singleRegionInfo{
verID: s.verID,
span: s.span.Clone(),
ts: s.ts,
rpcCtx: &tikv.RPCContext{},
}
if s.rpcCtx != nil {
sri.rpcCtx.Addr = s.rpcCtx.Addr
}
return sri
}

type regionErrorInfo struct {
singleRegionInfo
err error
Expand Down Expand Up @@ -348,21 +333,13 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
err = version.CheckStoreVersion(ctx, c.pd, storeID)
if err != nil {
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID))
return
}
client := cdcpb.NewChangeDataClient(conn.ClientConn)
var streamClient cdcpb.ChangeData_EventFeedClient
streamClient, err = client.EventFeed(ctx)
if err != nil {
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
err = cerror.WrapError(cerror.ErrTiKVEventFeed, err)
log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err))
return
Expand All @@ -373,7 +350,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
log.Debug("created stream to store", zap.String("addr", addr))
return nil
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(8), retry.WithIsRetryableErr(cerror.IsRetryableError))
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError))
return
}

Expand Down Expand Up @@ -1015,8 +992,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
innerErr := eerr.err
if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
<<<<<<< HEAD
// TODO: Handle the case that notleader.GetLeader() is nil.
s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader().GetStoreId(), errInfo.rpcCtx.AccessIdx)
=======
s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
>>>>>>> 82c4d68de (kvclient(ticdc): fix kvclient takes too long time to recover (#3612))
} else if innerErr.GetEpochNotMatch() != nil {
// TODO: If only confver is updated, we don't need to reload the region from region cache.
metricFeedEpochNotMatchCounter.Inc()
Expand Down Expand Up @@ -1049,10 +1030,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
case *sendRequestToStoreErr:
metricStoreSendRequestErr.Inc()
default:
//[TODO] Move all OnSendFail logic here
// We expect some unknown error to trigger RegionCache recheck its store state and change leader to peer to
// make some detection(peer may tell us where new leader is)
// RegionCache.OnSendFail is thread_safe inner.
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
if errInfo.rpcCtx.Meta != nil {
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err)
}
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err)
}

failpoint.Inject("kvClientRegionReentrantErrorDelay", nil)
Expand Down Expand Up @@ -1150,6 +1133,9 @@ func (s *eventFeedSession) receiveFromStream(
zap.Uint64("storeID", storeID),
zap.Error(err),
)
// Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down
// tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new
// election
}

// Use the same delay mechanism as `stream.Send` error handling, since
Expand Down
33 changes: 27 additions & 6 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -290,7 +291,18 @@ func newMockServiceSpecificAddr(
lis, err := lc.Listen(ctx, "tcp", listenAddr)
c.Assert(err, check.IsNil)
addr = lis.Addr().String()
grpcServer = grpc.NewServer()
kaep := keepalive.EnforcementPolicy{
MinTime: 60 * time.Second,
PermitWithoutStream: true,
}
kasp := keepalive.ServerParameters{
MaxConnectionIdle: 60 * time.Second, // If a client is idle for 60 seconds, send a GOAWAY
MaxConnectionAge: 60 * time.Second, // If any connection is alive for more than 60 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
cdcpb.RegisterChangeDataServer(grpcServer, srv)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1590,10 +1602,7 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
var genLock sync.Mutex
nextVer := -1
call := int32(0)
// 20 here not too much, since check version itself has 3 time retry, and
// region cache could also call get store API, which will trigger version
// generator too.
versionGenCallBoundary := int32(20)
versionGenCallBoundary := int32(8)
gen := func() string {
genLock.Lock()
defer genLock.Unlock()
Expand Down Expand Up @@ -1647,7 +1656,15 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
}()
lockresolver, isPullInit, grpcPool, cdcClient := createCDCKVClient(ctx, pdClient, kvStorage)
defer grpcPool.Close()
<<<<<<< HEAD
eventCh := make(chan model.RegionFeedEvent, 10)
=======
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
>>>>>>> 82c4d68de (kvclient(ticdc): fix kvclient takes too long time to recover (#3612))
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -2022,7 +2039,6 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change
ch1 <- event
}
clientWg.Wait()

cancel()
}

Expand Down Expand Up @@ -2503,6 +2519,7 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) {
cancel()
}

<<<<<<< HEAD
func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) {
defer testleak.AfterTest(c)()
sri := newSingleRegionInfo(
Expand All @@ -2519,6 +2536,8 @@ func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) {
c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{})
}

=======
>>>>>>> 82c4d68de (kvclient(ticdc): fix kvclient takes too long time to recover (#3612))
// TestResolveLockNoCandidate tests the resolved ts manager can work normally
// when no region exceeds reslove lock interval, that is what candidate means.
func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) {
Expand Down Expand Up @@ -2836,6 +2855,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
server1Stopped <- struct{}{}
}()
for {
// Currently no msg more than 60s will cause a GoAway msg to end the connection
_, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
Expand Down Expand Up @@ -2882,6 +2902,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
initialized := mockInitializedEvent(regionID3, currentRequestID())
ch1 <- initialized

// Connection close for timeout
<-server1Stopped

var requestIds sync.Map
Expand Down
7 changes: 3 additions & 4 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,17 +784,16 @@ func (w *regionWorker) evictAllRegions() error {
}
state.markStopped()
w.delRegionState(state.sri.verID.GetID())
singleRegionInfo := state.sri.partialClone()
if state.lastResolvedTs > singleRegionInfo.ts {
singleRegionInfo.ts = state.lastResolvedTs
if state.lastResolvedTs > state.sri.ts {
state.sri.ts = state.lastResolvedTs
}
revokeToken := !state.initialized
state.lock.Unlock()
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: singleRegionInfo,
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
return err == nil
Expand Down

0 comments on commit ae7527a

Please sign in to comment.