Skip to content

Commit

Permalink
Merge branch 'release-4.0-pending' into release-4.0-6d4d4e4faead
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Apr 27, 2021
2 parents 8eaa4d5 + e3742d5 commit d297e09
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 60 deletions.
13 changes: 10 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,12 +628,14 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// goroutine, it won't block the caller of `schedulerRegionRequest`.
s.scheduleDivideRegionAndRequest(ctx, r, sri.ts)
}
case regionspan.LockRangeStatusCancel:
return
default:
panic("unreachable")
}
}

res := s.rangeLock.LockRange(sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer())
res := s.rangeLock.LockRange(ctx, sri.span.Start, sri.span.End, sri.verID.GetID(), sri.verID.GetVer())

if res.Status == regionspan.LockRangeStatusWait {
res = res.WaitFn()
Expand Down Expand Up @@ -755,6 +757,7 @@ MainLoop:

state := newRegionFeedState(sri, requestID)
pendingRegions.insert(requestID, state)
failpoint.Inject("kvClientPendingRegionDelay", nil)

stream, ok := s.getStream(rpcCtx.Addr)
// Establish the stream if it has not been connected yet.
Expand Down Expand Up @@ -783,8 +786,12 @@ MainLoop:
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
// Delete the pendingRegion info from `pendingRegions` and retry connecting and sending the request.
pendingRegions.take(requestID)
// Take the pendingRegion from `pendingRegions`, if the region
// is deleted already, we don't retry for this region. Otherwise,
// retry to connect and send request for this region.
if _, exists := pendingRegions.take(requestID); !exists {
continue MainLoop
}
continue
}
s.addStream(rpcCtx.Addr, stream)
Expand Down
76 changes: 76 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2387,3 +2387,79 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) {
time.Sleep(time.Second)
cancel()
}

// TestClientV1UnlockRangeReentrant tests clientV1 can handle region reconnection
// with unstable TiKV store correctly. The test workflow is as follows:
// 1. kv client establishes two regions request, naming region-1, region-2, they
// belong to the same TiKV store.
// 2. The region-1 is firstly established, yet region-2 has some delay after its
// region state is inserted into `pendingRegions`
// 3. At this time the TiKV store crashes and `stream.Recv` returns error. In the
// defer function of `receiveFromStream`, all pending regions will be cleaned
// up, which means the region lock will be unlocked once for these regions.
// 4. In step-2, the region-2 continues to run, it can't get store stream which
// has been deleted in step-3, so it will create new stream but fails because
// of unstable TiKV store, at this point, the kv client should handle with the
// pending region correctly.
func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

clientv2 := enableKVClientV2
enableKVClientV2 = false
defer func() {
enableKVClientV2 = clientv2
}()

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)

cluster := mocktikv.NewCluster()
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
regionID4 := uint64(4)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)
cluster.SplitRaw(regionID3, regionID4, []byte("b"), []uint64{5}, 5)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(true)")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay", "1*sleep(0)->1*sleep(2000)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientPendingRegionDelay")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

// wait the second region is scheduled
time.Sleep(time.Millisecond * 500)
close(ch1)
server1.Stop()
// wait the kvClientPendingRegionDelay ends, and the second region is processed
time.Sleep(time.Second * 2)
cancel()
wg.Wait()
}
2 changes: 1 addition & 1 deletion cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (ts *tableStream) isEmpty() bool {
}

func (ts *tableStream) shouldFlush() bool {
return ts.sendSize.Load() > maxPartFlushSize
return ts.sendSize.Load() > maxRowFileSize
}

func (ts *tableStream) flush(ctx context.Context, sink *logSink) error {
Expand Down
31 changes: 23 additions & 8 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
maxCompletePartSize = 100 << 20 // rotate row changed event file if one complete file larger than 100Mb
maxDDLFlushSize = 10 << 20 // rotate ddl event file if one complete file larger than 10Mb

defaultBufferChanSize = 20480
defaultBufferChanSize = 1280000
defaultFlushRowChangedEventDuration = 5 * time.Second // TODO make it as a config
)

Expand Down Expand Up @@ -260,11 +260,6 @@ func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
s.ddlEncoder = s.encoder()
firstCreated = true
}
_, err := s.ddlEncoder.EncodeDDLEvent(ddl)
if err != nil {
return err
}
data := s.ddlEncoder.MixedBuild(firstCreated)
// reset encoder buf for next round append
defer s.ddlEncoder.Reset()

Expand All @@ -277,7 +272,7 @@ func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
SubDir: ddlEventsDir,
ListCount: 1,
}
err = s.storage.WalkDir(ctx, opt, func(key string, fileSize int64) error {
err := s.storage.WalkDir(ctx, opt, func(key string, fileSize int64) error {
log.Debug("[EmitDDLEvent] list content from s3",
zap.String("key", key),
zap.Int64("size", size),
Expand All @@ -289,7 +284,27 @@ func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if err != nil {
return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err)
}
if size == 0 || size > maxDDLFlushSize {

// only reboot and (size = 0 or size >= maxRowFileSize) should we add version to s3
withVersion := firstCreated && (size == 0 || size >= maxDDLFlushSize)

// clean ddlEncoder version part
// if we reboot cdc and size between (0, maxDDLFlushSize), we should skip version part in
// JSONEventBatchEncoder.keyBuf, JSONEventBatchEncoder consturctor func has
// alreay filled with version part, see NewJSONEventBatchEncoder and
// JSONEventBatchEncoder.MixedBuild
if firstCreated && size > 0 && size < maxDDLFlushSize {
s.ddlEncoder.Reset()
}

_, er := s.ddlEncoder.EncodeDDLEvent(ddl)
if er != nil {
return er
}

data := s.ddlEncoder.MixedBuild(withVersion)

if size == 0 || size >= maxDDLFlushSize {
// no ddl file exists or
// exists file is oversized. we should generate a new file
fileData = data
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// DefaultMaxMessageBytes sets the default value for max-message-bytes
DefaultMaxMessageBytes int = 64 * 1024 * 1024 // 64M
// DefaultMaxBatchSize sets the default value for max-batch-size
DefaultMaxBatchSize int = 4096
DefaultMaxBatchSize int = 16
)

type column struct {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, 4096)
c.Assert(encoder.maxBatchSize, check.Equals, 16)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, 64*1024*1024)

err = encoder.SetParams(map[string]string{"max-message-bytes": "0"})
Expand All @@ -237,12 +237,12 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, 4096)
c.Assert(encoder.maxBatchSize, check.Equals, 16)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxInt32)

err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, 4096)
c.Assert(encoder.maxBatchSize, check.Equals, 16)
c.Assert(encoder.maxKafkaMessageSize, check.Equals, math.MaxUint32)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func (s *simpleMySQLSink) executeRowChangedEvents(ctx context.Context, rows ...*
return errors.Trace(err)
}
}
sql, args = prepareUpdate(row.Table.QuoteString(), row.PreColumns, row.Columns, true)
sql, args = prepareReplace(row.Table.QuoteString(), row.Columns, true, false /* translateToInsert */)
} else if len(row.PreColumns) == 0 {
// insert
sql, args = prepareReplace(row.Table.QuoteString(), row.Columns, true, true)
sql, args = prepareReplace(row.Table.QuoteString(), row.Columns, true, false /* translateToInsert */)
} else if len(row.Columns) == 0 {
// delete
if s.enableCheckOldValue {
Expand Down
9 changes: 6 additions & 3 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func newUpdateChangefeedCommand() *cobra.Command {
Use: "update",
Short: "Update config of an existing replication task (changefeed)",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, args []string) (err error) {
ctx := defaultContext

old, err := cdcEtcdCli.GetChangeFeedInfo(ctx, changefeedID)
Expand All @@ -495,8 +495,8 @@ func newUpdateChangefeedCommand() *cobra.Command {
info.SinkURI = sinkURI
case "config":
cfg := info.Config
if err := strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
log.Panic("decode config file error", zap.Error(err))
if err = strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
log.Error("decode config file error", zap.Error(err))
}
case "opts":
for _, opt := range opts {
Expand Down Expand Up @@ -538,6 +538,9 @@ func newUpdateChangefeedCommand() *cobra.Command {
log.Warn("unsupported flag, please report a bug", zap.String("flagName", flag.Name))
}
})
if err != nil {
return err
}

resp, err := applyOwnerChangefeedQuery(ctx, changefeedID, getCredential())
// if no cdc owner exists, allow user to update changefeed config
Expand Down
33 changes: 23 additions & 10 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -281,7 +282,7 @@ func (n *throwNode) Receive(ctx NodeContext) error {
}

func (n *throwNode) Destroy(ctx NodeContext) error {
n.c.Assert(n.index, check.Equals, 6)
n.c.Assert(map[int]bool{4: true, 6: true}, check.HasKey, n.index)
return nil
}

Expand All @@ -302,22 +303,34 @@ func (s *pipelineSuite) TestPipelineThrow(c *check.C) {
},
}))
c.Assert(err, check.IsNil)
// this line may be return an error because the pipeline maybe closed before this line was executed
//nolint:errcheck
p.SendToFirstNode(PolymorphicEventMessage(&model.PolymorphicEvent{
// whether err is nil is not determined
// If add some delay here, such as sleep 50ms, there will be more probability
// that the second message is not sent.
// time.Sleep(time.Millisecond * 50)
err = p.SendToFirstNode(PolymorphicEventMessage(&model.PolymorphicEvent{
Row: &model.RowChangedEvent{
Table: &model.TableName{
Schema: "I am built by test function",
Table: "DD2",
},
},
}))
errs := p.Wait()
c.Assert(len(errs), check.Equals, 4)
c.Assert(errs[0].Error(), check.Equals, "error node throw an error, index: 3")
c.Assert(errs[1].Error(), check.Equals, "error node throw an error, index: 4")
c.Assert(errs[2].Error(), check.Equals, "error node throw an error, index: 5")
c.Assert(errs[3].Error(), check.Equals, "error node throw an error, index: 6")
if err != nil {
// pipeline closed before the second message was sent
c.Assert(cerror.ErrSendToClosedPipeline.Equal(err), check.IsTrue)
errs := p.Wait()
c.Assert(len(errs), check.Equals, 2)
c.Assert(errs[0].Error(), check.Equals, "error node throw an error, index: 3")
c.Assert(errs[1].Error(), check.Equals, "error node throw an error, index: 4")
} else {
// the second message was sent before pipeline closed
errs := p.Wait()
c.Assert(len(errs), check.Equals, 4)
c.Assert(errs[0].Error(), check.Equals, "error node throw an error, index: 3")
c.Assert(errs[1].Error(), check.Equals, "error node throw an error, index: 4")
c.Assert(errs[2].Error(), check.Equals, "error node throw an error, index: 5")
c.Assert(errs[3].Error(), check.Equals, "error node throw an error, index: 6")
}
}

func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) {
Expand Down
11 changes: 9 additions & 2 deletions pkg/regionspan/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package regionspan

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -286,7 +287,7 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio
}

// LockRange locks a range with specified version.
func (l *RegionRangeLock) LockRange(startKey, endKey []byte, regionID, version uint64) LockRangeResult {
func (l *RegionRangeLock) LockRange(ctx context.Context, startKey, endKey []byte, regionID, version uint64) LockRangeResult {
res, signalChs := l.tryLockRange(startKey, endKey, regionID, version)

if res.Status != LockRangeStatusWait {
Expand All @@ -298,7 +299,11 @@ func (l *RegionRangeLock) LockRange(startKey, endKey []byte, regionID, version u
var res1 LockRangeResult
for {
for _, ch := range signalChs1 {
<-ch
select {
case <-ctx.Done():
return LockRangeResult{Status: LockRangeStatusCancel}
case <-ch:
}
}
res1, signalChs1 = l.tryLockRange(startKey, endKey, regionID, version)
if res1.Status != LockRangeStatusWait {
Expand Down Expand Up @@ -374,6 +379,8 @@ const (
LockRangeStatusWait = 1
// LockRangeStatusStale means a LockRange operation is rejected because of the range's version is stale.
LockRangeStatusStale = 2
// LockRangeStatusCancel means a LockRange operation is cancelled.
LockRangeStatusCancel = 3
)

// LockRangeResult represents the result of LockRange method of RegionRangeLock.
Expand Down
Loading

0 comments on commit d297e09

Please sign in to comment.