Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient(ticdc): fix kvclient takes too long time to recover (#3612) #3662

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 9 additions & 28 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,6 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
}
}

// partialClone clones part fields of singleRegionInfo, this is used when error
// happens, kv client needs to recover region request from singleRegionInfo
func (s *singleRegionInfo) partialClone() singleRegionInfo {
sri := singleRegionInfo{
verID: s.verID,
span: s.span.Clone(),
ts: s.ts,
rpcCtx: &tikv.RPCContext{},
}
if s.rpcCtx != nil {
sri.rpcCtx.Addr = s.rpcCtx.Addr
}
return sri
}

type regionErrorInfo struct {
singleRegionInfo
err error
Expand Down Expand Up @@ -358,21 +343,13 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
err = version.CheckStoreVersion(ctx, c.pd, storeID)
if err != nil {
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID))
return
}
client := cdcpb.NewChangeDataClient(conn.ClientConn)
var streamClient cdcpb.ChangeData_EventFeedClient
streamClient, err = client.EventFeed(ctx)
if err != nil {
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
err = cerror.WrapError(cerror.ErrTiKVEventFeed, err)
log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err))
return
Expand All @@ -383,7 +360,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
log.Debug("created stream to store", zap.String("addr", addr))
return nil
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(8), retry.WithIsRetryableErr(cerror.IsRetryableError))
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError))
return
}

Expand Down Expand Up @@ -1025,7 +1002,6 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
innerErr := eerr.err
if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
// TODO: Handle the case that notleader.GetLeader() is nil.
s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
} else if innerErr.GetEpochNotMatch() != nil {
// TODO: If only confver is updated, we don't need to reload the region from region cache.
Expand Down Expand Up @@ -1059,10 +1035,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
case *sendRequestToStoreErr:
metricStoreSendRequestErr.Inc()
default:
//[TODO] Move all OnSendFail logic here
// We expect some unknown error to trigger RegionCache recheck its store state and change leader to peer to
// make some detection(peer may tell us where new leader is)
// RegionCache.OnSendFail is thread_safe inner.
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
if errInfo.rpcCtx.Meta != nil {
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err)
}
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err)
}

failpoint.Inject("kvClientRegionReentrantErrorDelay", nil)
Expand Down Expand Up @@ -1160,6 +1138,9 @@ func (s *eventFeedSession) receiveFromStream(
zap.Uint64("storeID", storeID),
zap.Error(err),
)
// Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down
// tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new
// election
}

// Use the same delay mechanism as `stream.Send` error handling, since
Expand Down
41 changes: 18 additions & 23 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -278,7 +279,18 @@ func newMockServiceSpecificAddr(
lis, err := lc.Listen(ctx, "tcp", listenAddr)
c.Assert(err, check.IsNil)
addr = lis.Addr().String()
grpcServer = grpc.NewServer()
kaep := keepalive.EnforcementPolicy{
MinTime: 60 * time.Second,
PermitWithoutStream: true,
}
kasp := keepalive.ServerParameters{
MaxConnectionIdle: 60 * time.Second, // If a client is idle for 60 seconds, send a GOAWAY
MaxConnectionAge: 60 * time.Second, // If any connection is alive for more than 60 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
cdcpb.RegisterChangeDataServer(grpcServer, srv)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1594,10 +1606,7 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
var genLock sync.Mutex
nextVer := -1
call := int32(0)
// 20 here not too much, since check version itself has 3 time retry, and
// region cache could also call get store API, which will trigger version
// generator too.
versionGenCallBoundary := int32(20)
versionGenCallBoundary := int32(8)
gen := func() string {
genLock.Lock()
defer genLock.Unlock()
Expand Down Expand Up @@ -1652,7 +1661,8 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
eventCh := make(chan model.RegionFeedEvent, 10)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -2031,7 +2041,6 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change
ch1 <- event
}
clientWg.Wait()

cancel()
}

Expand Down Expand Up @@ -2514,22 +2523,6 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) {
cancel()
}

func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) {
defer testleak.AfterTest(c)()
sri := newSingleRegionInfo(
tikv.RegionVerID{},
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
1000, &tikv.RPCContext{})
sri2 := sri.partialClone()
sri2.ts = 2000
sri2.span.End[0] = 'b'
c.Assert(sri.ts, check.Equals, uint64(1000))
c.Assert(sri.span.String(), check.Equals, "[61, 63)")
c.Assert(sri2.ts, check.Equals, uint64(2000))
c.Assert(sri2.span.String(), check.Equals, "[61, 62)")
c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{})
}

// TestResolveLockNoCandidate tests the resolved ts manager can work normally
// when no region exceeds reslove lock interval, that is what candidate means.
func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) {
Expand Down Expand Up @@ -2851,6 +2844,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
server1Stopped <- struct{}{}
}()
for {
// Currently no msg more than 60s will cause a GoAway msg to end the connection
_, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
Expand Down Expand Up @@ -2899,6 +2893,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
initialized := mockInitializedEvent(regionID3, currentRequestID())
ch1 <- initialized

// Connection close for timeout
<-server1Stopped

var requestIds sync.Map
Expand Down
7 changes: 3 additions & 4 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,17 +784,16 @@ func (w *regionWorker) evictAllRegions() error {
}
state.markStopped()
w.delRegionState(state.sri.verID.GetID())
singleRegionInfo := state.sri.partialClone()
if state.lastResolvedTs > singleRegionInfo.ts {
singleRegionInfo.ts = state.lastResolvedTs
if state.lastResolvedTs > state.sri.ts {
state.sri.ts = state.lastResolvedTs
}
revokeToken := !state.initialized
state.lock.Unlock()
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: singleRegionInfo,
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
return err == nil
Expand Down