Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3464
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
zhaoxinyu authored and ti-chi-bot committed Nov 26, 2021
1 parent 3e04da8 commit 3731f7c
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 71 deletions.
42 changes: 42 additions & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
Expand All @@ -54,10 +55,21 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

<<<<<<< HEAD
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool
=======
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache
TimeAcquirer pdtime.TimeAcquirer

tableActorSystem *system.System
>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464))

cancel context.CancelFunc

Expand Down Expand Up @@ -102,6 +114,10 @@ func (c *Capture) reset(ctx context.Context) error {
c.grpcPool.Close()
}
c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
if c.regionCache != nil {
c.regionCache.Close()
}
c.regionCache = tikv.NewRegionCache(c.pdClient)
log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr))
return nil
}
Expand Down Expand Up @@ -146,11 +162,22 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
<<<<<<< HEAD
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
=======
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
TimeAcquirer: c.TimeAcquirer,
TableActorSystem: c.tableActorSystem,
>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464))
})
err := c.register(ctx)
if err != nil {
Expand Down Expand Up @@ -335,6 +362,7 @@ func (c *Capture) register(ctx cdcContext.Context) error {
}

// AsyncClose closes the capture by unregistering it from etcd
// Note: this function should be reentrant
func (c *Capture) AsyncClose() {
defer c.cancel()
c.OperateOwnerUnderLock(func(o *owner.Owner) error { //nolint:errcheck
Expand All @@ -349,6 +377,20 @@ func (c *Capture) AsyncClose() {
if c.grpcPool != nil {
c.grpcPool.Close()
}
<<<<<<< HEAD
=======
if c.regionCache != nil {
c.regionCache.Close()
c.regionCache = nil
}
if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
if err != nil {
log.Warn("stop table actor system failed", zap.Error(err))
}
c.tableActorSystem = nil
}
>>>>>>> e46ded913 (ticdc/kvclient: make multiple cdc kv clients share one RegionCache (#3464))
}

// WriteDebugInfo writes the debug info into writer.
Expand Down
12 changes: 2 additions & 10 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ type CDCKVClient interface {
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
) error
Close() error
}

// NewCDCKVClient is the constructor of CDC KV client
Expand All @@ -308,7 +307,7 @@ type CDCClient struct {
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool) (c CDCKVClient) {
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

var store TiKVStorage
Expand All @@ -326,19 +325,12 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp
pd: pd,
kvStorage: store,
grpcPool: grpcPool,
regionCache: tikv.NewRegionCache(pd),
regionCache: regionCache,
regionLimiters: defaultRegionEventFeedLimiters,
}
return
}

// Close CDCClient
func (c *CDCClient) Close() error {
c.regionCache.Close()

return nil
}

func (c *CDCClient) getRegionLimiter(regionID uint64) *rate.Limiter {
return c.regionLimiters.getLimiter(regionID)
}
Expand Down
10 changes: 6 additions & 4 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,16 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

Expand Down Expand Up @@ -279,15 +280,16 @@ func prepareBench(b *testing.B, regionNum int) (
isPullInit := &mockPullerInit{}
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

Expand Down
Loading

0 comments on commit 3731f7c

Please sign in to comment.