Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: use ephemeral goroutine to handle region range request #1720

Merged
merged 8 commits into from
May 19, 2021
Merged
34 changes: 30 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,17 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
return ctx.Err()
case task := <-s.requestRangeCh:
s.rangeChSizeGauge.Dec()
err := s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
if err != nil {
return errors.Trace(err)
}
// divideAndSendEventFeedToRegions could be block for some time,
// since it must wait for the region lock available. In order to
// consume region range request from `requestRangeCh` as soon as
// possible, we create a new goroutine to handle it.
// The sequence of region range we process is not matter, the
// region lock keeps the region access sequence.
// Besides the count or frequency of range request is limitted,
// we use ephemeral goroutine instead of permanent gourotine.
g.Go(func() error {
return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
})
}
}
})
Expand Down Expand Up @@ -672,6 +679,25 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
}

res := s.rangeLock.LockRange(ctx, sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer())
failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) {
// short sleep to wait region has split
time.Sleep(time.Second)
s.rangeLock.UnlockRange(sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer(), sri.ts)
regionNum := val.(int)
retryRanges := make([]regionspan.ComparableSpan, 0, regionNum)
start := []byte("a")
end := []byte("b1001")
for i := 0; i < regionNum; i++ {
span := regionspan.Span{Start: start, End: end}
retryRanges = append(retryRanges, regionspan.ToComparableSpan(span))
start = end
end = []byte(fmt.Sprintf("b%d", 1002+i))
}
res = regionspan.LockRangeResult{
Status: regionspan.LockRangeStatusStale,
RetryRanges: retryRanges,
}
})

if res.Status == regionspan.LockRangeStatusWait {
res = res.WaitFn()
Expand Down
120 changes: 120 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -3053,3 +3054,122 @@ func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) {

cancel()
}

// TestConcurrentProcessRangeRequest when region range request channel is full,
// the kv client can process it correctly without deadlock. This is more likely
// to happen when region split and merge frequently and large stale requests exist.
func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

requestIDs := new(sync.Map)
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
return
}
requestIDs.Store(req.RegionId, req.RequestId)
}
}
server1, addr1 := newMockService(ctx, c, srv1, wg)

defer func() {
close(ch1)
server1.Stop()
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

storeID := uint64(1)
regionID := uint64(1000)
peerID := regionID + 1
cluster.AddStore(storeID, addr1)
cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock", "1*return(20)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// the kv client is blocked by failpoint injection, and after region has split
// into more sub regions, the kv client will continue to handle and will find
// stale region requests (which is also caused by failpoint injection).
regionNum := 20
for i := 1; i < regionNum; i++ {
regionID := uint64(i + 1000)
peerID := regionID + 1
// split regions to [min, b1001), [b1001, b1002), ... [bN, max)
cluster.SplitRaw(regionID-1, regionID, []byte(fmt.Sprintf("b%d", regionID)), []uint64{peerID}, peerID)
}

// wait for all regions requested from cdc kv client
err = retry.Run(time.Millisecond*200, 20, func() error {
count := 0
requestIDs.Range(func(_, _ interface{}) bool {
count++
return true
})
if count == regionNum {
return nil
}
return errors.Errorf("region number %d is not as expected %d", count, regionNum)
})
c.Assert(err, check.IsNil)

// send initialized event and a resolved ts event to each region
requestIDs.Range(func(key, value interface{}) bool {
regionID := key.(uint64)
requestID := value.(uint64)
initialized := mockInitializedEvent(regionID, requestID)
ch1 <- initialized
resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: requestID,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120},
},
}}
ch1 <- resolved
return true
})

resolvedCount := 0
checkEvent:
for {
select {
case <-eventCh:
resolvedCount++
if resolvedCount == regionNum*2 {
break checkEvent
}
case <-time.After(time.Second):
c.Errorf("no more events received")
}
}

cancel()
}