Skip to content

Commit

Permalink
kv/client: fix the error handling for io.EOF may miss region reconnect (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Apr 13, 2021
1 parent 0189b11 commit e5e4e1e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 35 deletions.
22 changes: 8 additions & 14 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,16 +1142,14 @@ func (s *eventFeedSession) receiveFromStream(
for {
cevent, err := stream.Recv()

failpoint.Inject("kvClientStreamRecvError", func() {
err = errors.New("injected stream recv error")
})
// TODO: Should we have better way to handle the errors?
if err == io.EOF {
for _, state := range regionStates {
close(state.regionEventCh)
failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) {
errStr := msg.(string)
if errStr == io.EOF.Error() {
err = io.EOF
} else {
err = errors.New(errStr)
}
return nil
}
})
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
Expand Down Expand Up @@ -1447,12 +1445,8 @@ func (s *eventFeedSession) singleEventFeed(
continue
case event, ok = <-receiverCh:
}
if !ok {
log.Debug("singleEventFeed receiver closed")
return lastResolvedTs, nil
}

if event == nil {
if !ok || event == nil {
log.Debug("singleEventFeed closed by error")
return lastResolvedTs, cerror.ErrEventFeedAborted.GenWithStackByArgs()
}
Expand Down
56 changes: 46 additions & 10 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,11 +1183,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
cancel()
}

// TestStreamSendWithError mainly tests the scenario that the `Recv` call of a gPRC
// stream in kv client meets error, and kv client logs the error and re-establish
// new request
func (s *etcdSuite) TestStreamRecvWithError(c *check.C) {
defer testleak.AfterTest(c)()
func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
Expand All @@ -1214,7 +1210,7 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) {
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)")
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", failpointStr)
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
Expand Down Expand Up @@ -1304,6 +1300,46 @@ func (s *etcdSuite) TestStreamRecvWithError(c *check.C) {
cancel()
}

// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets a **logical related** error, and kv client
// logs the error and re-establish new request.
func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) {
defer testleak.AfterTest(c)()

clientv2 := enableKVClientV2
defer func() {
enableKVClientV2 = clientv2
}()

// test client v2
enableKVClientV2 = true
s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")")

// test client v1
enableKVClientV2 = false
s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")")
}

// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call
// of a gPRC stream in kv client meets error io.EOF, and kv client logs the error
// and re-establish new request
func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) {
defer testleak.AfterTest(c)()

clientv2 := enableKVClientV2
defer func() {
enableKVClientV2 = clientv2
}()

// test client v2
enableKVClientV2 = true
s.testStreamRecvWithError(c, "1*return(\"EOF\")")

// test client v1
enableKVClientV2 = false
s.testStreamRecvWithError(c, "1*return(\"EOF\")")
}

// TestIncompatibleTiKV tests TiCDC new request to TiKV meets `ErrVersionIncompatible`
// error (in fact this error is raised before EventFeed API is really called),
// TiCDC will wait 20s and then retry. This is a common scenario when rolling
Expand Down Expand Up @@ -2321,8 +2357,8 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) {
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay", "sleep(500)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvErrorDelay")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientRegionReentrantErrorDelay")
}()
baseAllocatedID := currentRequestID()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
Expand Down Expand Up @@ -2402,7 +2438,7 @@ func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) {
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)
cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)")
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"injected stream recv error\")")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->1*sleep(2000)")
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -2465,7 +2501,7 @@ func (s *etcdSuite) TestClientErrNoPendingRegion(c *check.C) {
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)
cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)")
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"injected stream recv error\")")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->2*sleep(1000)")
c.Assert(err, check.IsNil)
Expand Down
13 changes: 7 additions & 6 deletions cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,14 @@ func (s *eventFeedSession) receiveFromStreamV2(
worker.inputCh <- nil
}
})
failpoint.Inject("kvClientStreamRecvError", func() {
err = errors.New("injected stream recv error")
failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) {
errStr := msg.(string)
if errStr == io.EOF.Error() {
err = io.EOF
} else {
err = errors.New(errStr)
}
})
if err == io.EOF {
close(worker.inputCh)
return nil
}
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
Expand Down
6 changes: 1 addition & 5 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,9 @@ func (w *regionWorker) eventHandler(ctx context.Context) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case event, ok := <-w.inputCh:
if !ok {
log.Debug("region worker receiver closed")
return nil
}
// event == nil means the region worker should exit and re-establish
// all existing regions.
if event == nil {
if !ok || event == nil {
log.Info("region worker closed by error")
return w.evictAllRegions(ctx)
}
Expand Down

0 comments on commit e5e4e1e

Please sign in to comment.