diff --git a/cdc/kv/client.go b/cdc/kv/client.go index cd71ab732c5..479860a06d5 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -132,21 +132,6 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan, } } -// partialClone clones part fields of singleRegionInfo, this is used when error -// happens, kv client needs to recover region request from singleRegionInfo -func (s *singleRegionInfo) partialClone() singleRegionInfo { - sri := singleRegionInfo{ - verID: s.verID, - span: s.span.Clone(), - ts: s.ts, - rpcCtx: &tikv.RPCContext{}, - } - if s.rpcCtx != nil { - sri.rpcCtx.Addr = s.rpcCtx.Addr - } - return sri -} - type regionErrorInfo struct { singleRegionInfo err error @@ -348,10 +333,6 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } err = version.CheckStoreVersion(ctx, c.pd, storeID) if err != nil { - // TODO: we don't close gPRC conn here, let it goes into TransientFailure - // state. If the store recovers, the gPRC conn can be reused. But if - // store goes away forever, the conn will be leaked, we need a better - // connection pool. log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID)) return } @@ -359,10 +340,6 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) var streamClient cdcpb.ChangeData_EventFeedClient streamClient, err = client.EventFeed(ctx) if err != nil { - // TODO: we don't close gPRC conn here, let it goes into TransientFailure - // state. If the store recovers, the gPRC conn can be reused. But if - // store goes away forever, the conn will be leaked, we need a better - // connection pool. err = cerror.WrapError(cerror.ErrTiKVEventFeed, err) log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err)) return @@ -373,7 +350,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } log.Debug("created stream to store", zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(8), retry.WithIsRetryableErr(cerror.IsRetryableError)) + }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError)) return } @@ -1015,8 +992,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI innerErr := eerr.err if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() +<<<<<<< HEAD // TODO: Handle the case that notleader.GetLeader() is nil. s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader().GetStoreId(), errInfo.rpcCtx.AccessIdx) +======= + s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) +>>>>>>> 82c4d68de (kvclient(ticdc): fix kvclient takes too long time to recover (#3612)) } else if innerErr.GetEpochNotMatch() != nil { // TODO: If only confver is updated, we don't need to reload the region from region cache. metricFeedEpochNotMatchCounter.Inc() @@ -1049,10 +1030,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() default: + //[TODO] Move all OnSendFail logic here + // We expect some unknown error to trigger RegionCache recheck its store state and change leader to peer to + // make some detection(peer may tell us where new leader is) + // RegionCache.OnSendFail is thread_safe inner. bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - if errInfo.rpcCtx.Meta != nil { - s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) - } + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) } failpoint.Inject("kvClientRegionReentrantErrorDelay", nil) @@ -1150,6 +1133,9 @@ func (s *eventFeedSession) receiveFromStream( zap.Uint64("storeID", storeID), zap.Error(err), ) + // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down + // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new + // election } // Use the same delay mechanism as `stream.Send` error handling, since diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 11854493921..6960ed1b09d 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -46,6 +46,7 @@ import ( pd "github.com/tikv/pd/client" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) func Test(t *testing.T) { @@ -290,7 +291,18 @@ func newMockServiceSpecificAddr( lis, err := lc.Listen(ctx, "tcp", listenAddr) c.Assert(err, check.IsNil) addr = lis.Addr().String() - grpcServer = grpc.NewServer() + kaep := keepalive.EnforcementPolicy{ + MinTime: 60 * time.Second, + PermitWithoutStream: true, + } + kasp := keepalive.ServerParameters{ + MaxConnectionIdle: 60 * time.Second, // If a client is idle for 60 seconds, send a GOAWAY + MaxConnectionAge: 60 * time.Second, // If any connection is alive for more than 60 seconds, send a GOAWAY + MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections + Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active + Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead + } + grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) cdcpb.RegisterChangeDataServer(grpcServer, srv) wg.Add(1) go func() { @@ -1590,10 +1602,7 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) { var genLock sync.Mutex nextVer := -1 call := int32(0) - // 20 here not too much, since check version itself has 3 time retry, and - // region cache could also call get store API, which will trigger version - // generator too. - versionGenCallBoundary := int32(20) + versionGenCallBoundary := int32(8) gen := func() string { genLock.Lock() defer genLock.Unlock() @@ -1647,7 +1656,15 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) { }() lockresolver, isPullInit, grpcPool, cdcClient := createCDCKVClient(ctx, pdClient, kvStorage) defer grpcPool.Close() +<<<<<<< HEAD eventCh := make(chan model.RegionFeedEvent, 10) +======= + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + // NOTICE: eventCh may block the main logic of EventFeed + eventCh := make(chan model.RegionFeedEvent, 128) +>>>>>>> 82c4d68de (kvclient(ticdc): fix kvclient takes too long time to recover (#3612)) wg.Add(1) go func() { defer wg.Done() @@ -2022,7 +2039,6 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change ch1 <- event } clientWg.Wait() - cancel() } @@ -2503,6 +2519,7 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) { cancel() } +<<<<<<< HEAD func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) { defer testleak.AfterTest(c)() sri := newSingleRegionInfo( @@ -2519,6 +2536,8 @@ func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) { c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{}) } +======= +>>>>>>> 82c4d68de (kvclient(ticdc): fix kvclient takes too long time to recover (#3612)) // TestResolveLockNoCandidate tests the resolved ts manager can work normally // when no region exceeds reslove lock interval, that is what candidate means. func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) { @@ -2836,6 +2855,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { server1Stopped <- struct{}{} }() for { + // Currently no msg more than 60s will cause a GoAway msg to end the connection _, err := server.Recv() if err != nil { log.Error("mock server error", zap.Error(err)) @@ -2882,6 +2902,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { initialized := mockInitializedEvent(regionID3, currentRequestID()) ch1 <- initialized + // Connection close for timeout <-server1Stopped var requestIds sync.Map diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index ecf368e8cd2..f4cf1755d72 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -784,9 +784,8 @@ func (w *regionWorker) evictAllRegions() error { } state.markStopped() w.delRegionState(state.sri.verID.GetID()) - singleRegionInfo := state.sri.partialClone() - if state.lastResolvedTs > singleRegionInfo.ts { - singleRegionInfo.ts = state.lastResolvedTs + if state.lastResolvedTs > state.sri.ts { + state.sri.ts = state.lastResolvedTs } revokeToken := !state.initialized state.lock.Unlock() @@ -794,7 +793,7 @@ func (w *regionWorker) evictAllRegions() error { // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{ - singleRegionInfo: singleRegionInfo, + singleRegionInfo: state.sri, err: cerror.ErrEventFeedAborted.FastGenByArgs(), }, revokeToken) return err == nil