Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc/kvclient: make multiple cdc kv clients share one RegionCache #3464

Merged
merged 10 commits into from
Nov 26, 2021
18 changes: 14 additions & 4 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand All @@ -55,10 +56,11 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache

cancel context.CancelFunc

Expand Down Expand Up @@ -104,6 +106,10 @@ func (c *Capture) reset(ctx context.Context) error {
c.grpcPool.Close()
}
c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
if c.regionCache != nil {
c.regionCache.Close()
}
c.regionCache = tikv.NewRegionCache(c.pdClient)
log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr))
return nil
}
Expand Down Expand Up @@ -153,6 +159,7 @@ func (c *Capture) run(stdCtx context.Context) error {
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
})
err := c.register(ctx)
if err != nil {
Expand Down Expand Up @@ -353,6 +360,9 @@ func (c *Capture) AsyncClose() {
if c.grpcPool != nil {
c.grpcPool.Close()
}
if c.regionCache != nil {
c.regionCache.Close()
}
}

// WriteDebugInfo writes the debug info into writer.
Expand Down
7 changes: 3 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ var NewCDCKVClient func(
pd pd.Client,
kvStorage tikv.Storage,
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
) CDCKVClient = NewCDCClient

// CDCClient to get events from TiKV
Expand All @@ -308,7 +309,7 @@ type CDCClient struct {
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient) {
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

var store TiKVStorage
Expand All @@ -326,16 +327,14 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp
pd: pd,
kvStorage: store,
grpcPool: grpcPool,
regionCache: tikv.NewRegionCache(pd),
regionCache: regionCache,
regionLimiters: defaultRegionEventFeedLimiters,
}
return
}

// Close CDCClient
func (c *CDCClient) Close() error {
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
c.regionCache.Close()

return nil
}

Expand Down
8 changes: 6 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -280,7 +282,9 @@ func prepareBench(b *testing.B, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
105 changes: 78 additions & 27 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ func (s *clientSuite) TestNewClose(c *check.C) {

grpcPool := NewGrpcPoolImpl(context.Background(), &security.Credential{})
defer grpcPool.Close()
cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, regionCache)
err = cli.Close()
c.Assert(err, check.IsNil)
}
Expand Down Expand Up @@ -350,7 +352,9 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(context.Background(), pdClient, kvStorage, grpcPool, regionCache)
defer cdcClient.Close() //nolint:errcheck
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
Expand Down Expand Up @@ -449,7 +453,9 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -547,7 +553,9 @@ func (s *clientSuite) TestHandleError(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -704,7 +712,9 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
var wg2 sync.WaitGroup
wg2.Add(1)
Expand Down Expand Up @@ -772,8 +782,9 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) {

grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()

cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)

var wg2 sync.WaitGroup
Expand Down Expand Up @@ -839,7 +850,9 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1288,7 +1301,9 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1398,7 +1413,9 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 40)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1527,7 +1544,9 @@ func (s *clientSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1739,7 +1758,9 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1814,7 +1835,9 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)

wg.Add(1)
Expand Down Expand Up @@ -1891,7 +1914,9 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2000,7 +2025,9 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2099,7 +2126,9 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
var clientWg sync.WaitGroup
clientWg.Add(1)
Expand Down Expand Up @@ -2247,7 +2276,9 @@ func (s *clientSuite) testEventAfterFeedStop(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2426,7 +2457,9 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2657,7 +2690,9 @@ func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2751,7 +2786,9 @@ func (s *clientSuite) TestFailRegionReentrant(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2832,7 +2869,9 @@ func (s *clientSuite) TestClientV1UnlockRangeReentrant(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2898,7 +2937,9 @@ func (s *clientSuite) testClientErrNoPendingRegion(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2974,7 +3015,9 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3123,7 +3166,9 @@ func (s *clientSuite) TestConcurrentProcessRangeRequest(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3238,7 +3283,9 @@ func (s *clientSuite) TestEvTimeUpdate(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3357,7 +3404,9 @@ func (s *clientSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) {
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3448,7 +3497,9 @@ func (s *clientSuite) TestPrewriteNotMatchError(c *check.C) {
lockResolver := txnutil.NewLockerResolver(kvStorage)
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 10)
baseAllocatedID := currentRequestID()

Expand Down
Loading