Skip to content

Commit

Permalink
kv/client: use ephemeral goroutine to handle region range request (#1720
Browse files Browse the repository at this point in the history
) (#1800)
  • Loading branch information
ti-chi-bot authored May 19, 2021
1 parent 88a0a64 commit f0f47ae
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 4 deletions.
34 changes: 30 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,17 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
return ctx.Err()
case task := <-s.requestRangeCh:
s.rangeChSizeGauge.Dec()
err := s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
if err != nil {
return errors.Trace(err)
}
// divideAndSendEventFeedToRegions could be block for some time,
// since it must wait for the region lock available. In order to
// consume region range request from `requestRangeCh` as soon as
// possible, we create a new goroutine to handle it.
// The sequence of region range we process is not matter, the
// region lock keeps the region access sequence.
// Besides the count or frequency of range request is limitted,
// we use ephemeral goroutine instead of permanent gourotine.
g.Go(func() error {
return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
})
}
}
})
Expand Down Expand Up @@ -660,6 +667,25 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
}

res := s.rangeLock.LockRange(ctx, sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer())
failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) {
// short sleep to wait region has split
time.Sleep(time.Second)
s.rangeLock.UnlockRange(sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer(), sri.ts)
regionNum := val.(int)
retryRanges := make([]regionspan.ComparableSpan, 0, regionNum)
start := []byte("a")
end := []byte("b1001")
for i := 0; i < regionNum; i++ {
span := regionspan.Span{Start: start, End: end}
retryRanges = append(retryRanges, regionspan.ToComparableSpan(span))
start = end
end = []byte(fmt.Sprintf("b%d", 1002+i))
}
res = regionspan.LockRangeResult{
Status: regionspan.LockRangeStatusStale,
RetryRanges: retryRanges,
}
})

if res.Status == regionspan.LockRangeStatusWait {
res = res.WaitFn()
Expand Down
122 changes: 122 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -3083,3 +3084,124 @@ func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) {

cancel()
}

// TestConcurrentProcessRangeRequest when region range request channel is full,
// the kv client can process it correctly without deadlock. This is more likely
// to happen when region split and merge frequently and large stale requests exist.
func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

requestIDs := new(sync.Map)
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
return
}
requestIDs.Store(req.RegionId, req.RequestId)
}
}
server1, addr1 := newMockService(ctx, c, srv1, wg)

defer func() {
close(ch1)
server1.Stop()
wg.Wait()
}()

cluster := mocktikv.NewCluster()
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

storeID := uint64(1)
regionID := uint64(1000)
peerID := regionID + 1
cluster.AddStore(storeID, addr1)
cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock", "1*return(20)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientMockRangeLock")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// the kv client is blocked by failpoint injection, and after region has split
// into more sub regions, the kv client will continue to handle and will find
// stale region requests (which is also caused by failpoint injection).
regionNum := 20
for i := 1; i < regionNum; i++ {
regionID := uint64(i + 1000)
peerID := regionID + 1
// split regions to [min, b1001), [b1001, b1002), ... [bN, max)
cluster.SplitRaw(regionID-1, regionID, []byte(fmt.Sprintf("b%d", regionID)), []uint64{peerID}, peerID)
}

// wait for all regions requested from cdc kv client
err = retry.Run(time.Millisecond*200, 20, func() error {
count := 0
requestIDs.Range(func(_, _ interface{}) bool {
count++
return true
})
if count == regionNum {
return nil
}
return errors.Errorf("region number %d is not as expected %d", count, regionNum)
})
c.Assert(err, check.IsNil)

// send initialized event and a resolved ts event to each region
requestIDs.Range(func(key, value interface{}) bool {
regionID := key.(uint64)
requestID := value.(uint64)
initialized := mockInitializedEvent(regionID, requestID)
ch1 <- initialized
resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: requestID,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120},
},
}}
ch1 <- resolved
return true
})

resolvedCount := 0
checkEvent:
for {
select {
case <-eventCh:
resolvedCount++
if resolvedCount == regionNum*2 {
break checkEvent
}
case <-time.After(time.Second):
c.Errorf("no more events received")
}
}

cancel()
}

0 comments on commit f0f47ae

Please sign in to comment.