Skip to content

Commit

Permalink
cdc,retry: fix leader missing by extending region retry duration (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Jun 21, 2022
1 parent dbf79fe commit 3a725a6
Show file tree
Hide file tree
Showing 28 changed files with 310 additions and 222 deletions.
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema
logTime = now
}
return err
}, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), retry.WithIsRetryableErr(isRetryable))
}, retry.WithBackoffBaseDelay(10), retry.WithIsRetryableErr(isRetryable))

return snap, err
}
Expand Down
44 changes: 21 additions & 23 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ type CDCKVClient interface {
ctx context.Context,
span regionspan.ComparableSpan,
ts uint64,
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
Expand All @@ -307,6 +306,7 @@ var NewCDCKVClient = NewCDCClient
type CDCClient struct {
pd pd.Client

config *config.KVClientConfig
clusterID uint64

grpcPool GrpcPool
Expand All @@ -318,7 +318,14 @@ type CDCClient struct {
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) {
func NewCDCClient(
ctx context.Context,
pd pd.Client,
kvStorage tikv.Storage,
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
cfg *config.KVClientConfig,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

var store TiKVStorage
Expand All @@ -333,6 +340,7 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp

c = &CDCClient{
clusterID: clusterID,
config: cfg,
pd: pd,
kvStorage: store,
grpcPool: grpcPool,
Expand Down Expand Up @@ -393,14 +401,12 @@ type PullerInitialization interface {
// The `Start` and `End` field in input span must be memcomparable encoded.
func (c *CDCClient) EventFeed(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(ctx, c, c.regionCache, c.kvStorage, span,
lockResolver, isPullerInit,
enableOldValue, ts, eventCh)
lockResolver, isPullerInit, ts, eventCh)
return s.eventFeed(ctx, ts)
}

Expand Down Expand Up @@ -441,8 +447,7 @@ type eventFeedSession struct {
// The queue is used to store region that reaches limit
rateLimitQueue []regionErrorInfo

rangeLock *regionspan.RegionRangeLock
enableOldValue bool
rangeLock *regionspan.RegionRangeLock

// To identify metrics of different eventFeedSession
id string
Expand All @@ -468,25 +473,22 @@ func newEventFeedSession(
totalSpan regionspan.ComparableSpan,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
enableOldValue bool,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
return &eventFeedSession{
client: client,
regionCache: regionCache,
kvStorage: kvStorage,
totalSpan: totalSpan,
eventCh: eventCh,
regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit),
regionRouter: NewSizedRegionRouter(ctx, client.config.RegionScanLimit),
regionCh: make(chan singleRegionInfo, defaultRegionChanSize),
errCh: make(chan regionErrorInfo, defaultRegionChanSize),
requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize),
rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize),
rangeLock: regionspan.NewRegionRangeLock(totalSpan.Start, totalSpan.End, startTs),
enableOldValue: enableOldValue,
lockResolver: lockResolver,
isPullerInit: isPullerInit,
id: id,
Expand Down Expand Up @@ -693,11 +695,8 @@ func (s *eventFeedSession) requestRegionToStore(
}
requestID := allocID()

extraOp := kvrpcpb.ExtraOp_Noop
if s.enableOldValue {
extraOp = kvrpcpb.ExtraOp_ReadOldValue
}

// Always read old value.
extraOp := kvrpcpb.ExtraOp_ReadOldValue
rpcCtx := sri.rpcCtx
regionID := rpcCtx.Meta.GetId()
req := &cdcpb.ChangeDataRequest{
Expand Down Expand Up @@ -897,6 +896,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
nextSpan := span
captureAddr := util.CaptureAddrFromCtx(ctx)

// Max backoff 500ms.
scanRegionMaxBackoff := int64(500)
for {
var (
regions []*tikv.Region
Expand Down Expand Up @@ -926,7 +927,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
}
log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas))
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError))
}, retry.WithBackoffMaxDelay(scanRegionMaxBackoff),
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration)))
if retryErr != nil {
return retryErr
}
Expand Down Expand Up @@ -1330,7 +1332,7 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (model.RegionFeedEvent, error) {
func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
case cdcpb.Event_Row_DELETE:
Expand All @@ -1350,14 +1352,10 @@ func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bo
StartTs: entry.StartTs,
CRTs: entry.CommitTs,
RegionID: regionID,
OldValue: entry.GetOldValue(),
},
}

// when old-value is disabled, it is still possible for the tikv to send a event containing the old value
// we need avoid a old-value sent to downstream when old-value is disabled
if enableOldValue {
revent.Val.OldValue = entry.GetOldValue()
}
return revent, nil
}

Expand Down
9 changes: 5 additions & 4 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
Expand Down Expand Up @@ -192,11 +193,11 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down Expand Up @@ -283,11 +284,11 @@ func prepareBench(b *testing.B, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down
Loading

0 comments on commit 3a725a6

Please sign in to comment.