From 3129458ff98a050307bf9c3fe0a7a2f9a7e95410 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 28 Apr 2021 16:04:03 +0800 Subject: [PATCH 1/5] kv/client: use ephemeral goroutine to handle region range request --- cdc/kv/client.go | 35 +++++++++++-- cdc/kv/client_test.go | 116 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 5 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 8aedeeba6b5..7cb78494d94 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -600,10 +600,17 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { return ctx.Err() case task := <-s.requestRangeCh: s.rangeChSizeGauge.Dec() - err := s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts) - if err != nil { - return errors.Trace(err) - } + // divideAndSendEventFeedToRegions could be block for some time, + // since it must wait for the region lock available. In order to + // consume region range request from `requestRangeCh` as soon as + // possible, we create a new goroutine to handle it. + // The sequence of region range we process is not matter, the + // region lock keeps the region access sequence. + // Besides the count or frequency of range request is limitted, + // we use ephemeral goroutine instead of permanent gourotine. + g.Go(func() error { + return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts) + }) } } }) @@ -671,6 +678,24 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single } res := s.rangeLock.LockRange(ctx, sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer()) + failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) { + // short sleep to wait region has split + time.Sleep(time.Second) + regionNum := val.(int) + retryRanges := make([]regionspan.ComparableSpan, 0, regionNum) + start := []byte("a") + end := []byte("b1001") + for i := 0; i < regionNum; i++ { + span := regionspan.Span{Start: start, End: end} + retryRanges = append(retryRanges, regionspan.ToComparableSpan(span)) + start = end + end = []byte(fmt.Sprintf("b%d", 1002+i)) + } + res = regionspan.LockRangeResult{ + Status: regionspan.LockRangeStatusStale, + RetryRanges: retryRanges, + } + }) if res.Status == regionspan.LockRangeStatusWait { res = res.WaitFn() @@ -1043,7 +1068,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( if err != nil { return errors.Trace(err) } - log.Debug("get partialSpan", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id)) + log.Info("get partialSpan", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id)) nextSpan.Start = region.EndKey diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 62db799429e..f102a30c7e0 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -15,6 +15,7 @@ package kv import ( "context" + "fmt" "net" "sync" "sync/atomic" @@ -3052,3 +3053,118 @@ func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) { cancel() } + +// TestConcurrentProcessRangeRequest when region range request channel is full, +// the kv client can process it correctly without deadlock. This is more likely +// to happen when region split and merge frequently and large stale requests exist. +func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + requestIDs := new(sync.Map) + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { + for { + req, err := server.Recv() + if err != nil { + return + } + requestIDs.Store(req.RegionId, req.RequestId) + } + } + server1, addr1 := newMockService(ctx, c, srv1, wg) + + defer func() { + close(ch1) + server1.Stop() + wg.Wait() + }() + + rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + kvStorage := newStorageWithCurVersionCache(tiStore, addr1) + defer kvStorage.Close() //nolint:errcheck + + storeID := uint64(1) + regionID := uint64(1000) + peerID := regionID + 1 + cluster.AddStore(storeID, addr1) + cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID) + + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock", "1*return(20)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock") + }() + lockresolver := txnutil.NewLockerResolver(kvStorage) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + cdcClient.Close() //nolint:errcheck + wg.Done() + }() + + regionNum := 20 + for i := 1; i < regionNum; i++ { + regionID := uint64(i + 1000) + peerID := regionID + 1 + // split regions to [min, b1001), [b1001, b1002), ... [bN, max) + cluster.SplitRaw(regionID-1, regionID, []byte(fmt.Sprintf("b%d", regionID)), []uint64{peerID}, peerID) + } + + // wait all regions requested from cdc kv client + err = retry.Run(time.Millisecond*200, 20, func() error { + count := 0 + requestIDs.Range(func(_, _ interface{}) bool { + count++ + return true + }) + if count == regionNum-1 { + return nil + } + return errors.Errorf("region number %d is not as expected %d", count, regionNum) + }) + c.Assert(err, check.IsNil) + + requestIDs.Range(func(key, value interface{}) bool { + regionID := key.(uint64) + requestID := value.(uint64) + initialized := mockInitializedEvent(regionID, requestID) + ch1 <- initialized + resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, + }, + }} + ch1 <- resolved + return true + }) + + resolvedCount := 0 +checkEvent: + for { + select { + case <-eventCh: + resolvedCount++ + if resolvedCount == (regionNum-1)*2 { + break checkEvent + } + case <-time.After(time.Second): + c.Errorf("no more events received") + } + } + + cancel() +} From b37896c17fbe77b5ec6a22bc03fe5cbab66b4164 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 29 Apr 2021 16:39:56 +0800 Subject: [PATCH 2/5] add some comments --- cdc/kv/client_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index f102a30c7e0..096db47337a 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -3114,6 +3114,9 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { wg.Done() }() + // the kv client is blocked by failpoint injection, and after region has split + // into more sub regions, the kv client will continue to handle and will find + // stale region requests (which is also caused by failpoint injection). regionNum := 20 for i := 1; i < regionNum; i++ { regionID := uint64(i + 1000) @@ -3122,7 +3125,7 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { cluster.SplitRaw(regionID-1, regionID, []byte(fmt.Sprintf("b%d", regionID)), []uint64{peerID}, peerID) } - // wait all regions requested from cdc kv client + // wait for all regions requested from cdc kv client err = retry.Run(time.Millisecond*200, 20, func() error { count := 0 requestIDs.Range(func(_, _ interface{}) bool { @@ -3136,6 +3139,7 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { }) c.Assert(err, check.IsNil) + // send initialized event and a resolved ts event to each region requestIDs.Range(func(key, value interface{}) bool { regionID := key.(uint64) requestID := value.(uint64) From 179df48f5f7e24a399f4c2c8cd6743a9baee67d3 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 29 Apr 2021 21:30:41 +0800 Subject: [PATCH 3/5] refine test cases --- cdc/kv/client.go | 1 + cdc/kv/client_test.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 7cb78494d94..0407e817185 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -681,6 +681,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) { // short sleep to wait region has split time.Sleep(time.Second) + s.rangeLock.UnlockRange(sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer(), sri.ts) regionNum := val.(int) retryRanges := make([]regionspan.ComparableSpan, 0, regionNum) start := []byte("a") diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 096db47337a..1eb6954eb1d 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -3132,7 +3132,7 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { count++ return true }) - if count == regionNum-1 { + if count == regionNum { return nil } return errors.Errorf("region number %d is not as expected %d", count, regionNum) @@ -3162,7 +3162,7 @@ checkEvent: select { case <-eventCh: resolvedCount++ - if resolvedCount == (regionNum-1)*2 { + if resolvedCount == regionNum*2 { break checkEvent } case <-time.After(time.Second): From cd5b8372a688bb29dd3084ae352f5f501ce3a3be Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 29 Apr 2021 21:32:10 +0800 Subject: [PATCH 4/5] revert log level --- cdc/kv/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0407e817185..d74eebed46a 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1069,7 +1069,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( if err != nil { return errors.Trace(err) } - log.Info("get partialSpan", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id)) + log.Debug("get partialSpan", zap.Stringer("span", partialSpan), zap.Uint64("regionID", region.Id)) nextSpan.Start = region.EndKey From 6d320fe43c94fa298123fdd05a90eabfcf1da883 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 19 May 2021 10:40:59 +0800 Subject: [PATCH 5/5] fix dep api change --- cdc/kv/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 2d898b758bf..77ab3fe0003 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -3084,7 +3084,7 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { wg.Wait() }() - rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("") + rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler()) c.Assert(err, check.IsNil) pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)