diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 80b1ca5e5a8..b93cc8ea8ce 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -589,10 +589,17 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { return ctx.Err() case task := <-s.requestRangeCh: s.rangeChSizeGauge.Dec() - err := s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts) - if err != nil { - return errors.Trace(err) - } + // divideAndSendEventFeedToRegions could be block for some time, + // since it must wait for the region lock available. In order to + // consume region range request from `requestRangeCh` as soon as + // possible, we create a new goroutine to handle it. + // The sequence of region range we process is not matter, the + // region lock keeps the region access sequence. + // Besides the count or frequency of range request is limitted, + // we use ephemeral goroutine instead of permanent gourotine. + g.Go(func() error { + return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts) + }) } } }) @@ -660,6 +667,25 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single } res := s.rangeLock.LockRange(ctx, sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer()) + failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) { + // short sleep to wait region has split + time.Sleep(time.Second) + s.rangeLock.UnlockRange(sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer(), sri.ts) + regionNum := val.(int) + retryRanges := make([]regionspan.ComparableSpan, 0, regionNum) + start := []byte("a") + end := []byte("b1001") + for i := 0; i < regionNum; i++ { + span := regionspan.Span{Start: start, End: end} + retryRanges = append(retryRanges, regionspan.ToComparableSpan(span)) + start = end + end = []byte(fmt.Sprintf("b%d", 1002+i)) + } + res = regionspan.LockRangeResult{ + Status: regionspan.LockRangeStatusStale, + RetryRanges: retryRanges, + } + }) if res.Status == regionspan.LockRangeStatusWait { res = res.WaitFn() diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 8aed668d578..3ad3a6d18c5 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -15,6 +15,7 @@ package kv import ( "context" + "fmt" "net" "sync" "sync/atomic" @@ -3083,3 +3084,124 @@ func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) { cancel() } + +// TestConcurrentProcessRangeRequest when region range request channel is full, +// the kv client can process it correctly without deadlock. This is more likely +// to happen when region split and merge frequently and large stale requests exist. +func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + requestIDs := new(sync.Map) + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { + for { + req, err := server.Recv() + if err != nil { + return + } + requestIDs.Store(req.RegionId, req.RequestId) + } + } + server1, addr1 := newMockService(ctx, c, srv1, wg) + + defer func() { + close(ch1) + server1.Stop() + wg.Wait() + }() + + cluster := mocktikv.NewCluster() + mvccStore := mocktikv.MustNewMVCCStore() + rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + kvStorage := newStorageWithCurVersionCache(tiStore, addr1) + defer kvStorage.Close() //nolint:errcheck + + storeID := uint64(1) + regionID := uint64(1000) + peerID := regionID + 1 + cluster.AddStore(storeID, addr1) + cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID) + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock", "1*return(20)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock") + }() + lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + cdcClient.Close() //nolint:errcheck + wg.Done() + }() + + // the kv client is blocked by failpoint injection, and after region has split + // into more sub regions, the kv client will continue to handle and will find + // stale region requests (which is also caused by failpoint injection). + regionNum := 20 + for i := 1; i < regionNum; i++ { + regionID := uint64(i + 1000) + peerID := regionID + 1 + // split regions to [min, b1001), [b1001, b1002), ... [bN, max) + cluster.SplitRaw(regionID-1, regionID, []byte(fmt.Sprintf("b%d", regionID)), []uint64{peerID}, peerID) + } + + // wait for all regions requested from cdc kv client + err = retry.Run(time.Millisecond*200, 20, func() error { + count := 0 + requestIDs.Range(func(_, _ interface{}) bool { + count++ + return true + }) + if count == regionNum { + return nil + } + return errors.Errorf("region number %d is not as expected %d", count, regionNum) + }) + c.Assert(err, check.IsNil) + + // send initialized event and a resolved ts event to each region + requestIDs.Range(func(key, value interface{}) bool { + regionID := key.(uint64) + requestID := value.(uint64) + initialized := mockInitializedEvent(regionID, requestID) + ch1 <- initialized + resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, + }, + }} + ch1 <- resolved + return true + }) + + resolvedCount := 0 +checkEvent: + for { + select { + case <-eventCh: + resolvedCount++ + if resolvedCount == regionNum*2 { + break checkEvent + } + case <-time.After(time.Second): + c.Errorf("no more events received") + } + } + + cancel() +}