Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Jun 10, 2021
1 parent b6d58ef commit fc4adf4
Show file tree
Hide file tree
Showing 16 changed files with 1,111 additions and 170 deletions.
26 changes: 22 additions & 4 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ const (

// defines the scan region limit for each table
regionScanLimitPerTable = 40

// time interval to force kv client to terminate gRPC stream and reconnect
reconnectInterval = 15 * time.Minute
)

// time interval to force kv client to terminate gRPC stream and reconnect
var reconnectInterval = 15 * time.Minute

// hard code switch
// true: use kv client v2, which has a region worker for each stream
// false: use kv client v1, which runs a goroutine for every single region
var enableKVClientV2 = false
var enableKVClientV2 = true

type singleRegionInfo struct {
verID tikv.RegionVerID
Expand Down Expand Up @@ -177,6 +177,24 @@ func (s *regionFeedState) isStopped() bool {
return atomic.LoadInt32(&s.stopped) > 0
}

func (s *regionFeedState) isInitialized() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.initialized
}

func (s *regionFeedState) getLastResolvedTs() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastResolvedTs
}

func (s *regionFeedState) getRegionSpan() regionspan.ComparableSpan {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.span
}

type syncRegionFeedStateMap struct {
mu *sync.Mutex
regionInfoMap map[uint64]*regionFeedState
Expand Down
48 changes: 46 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,34 @@ func benchmarkSingleWorkerResolvedTs(b *testing.B, clientV2 bool) {
}
}

func benchmarkResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
InitWorkerPool()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */)
}

func BenchmarkResolvedTsClientV1(b *testing.B) {
benchmarkSingleWorkerResolvedTs(b, false /* clientV1 */)
}

func BenchmarkResolvedTsClientV2(b *testing.B) {
benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */)
benchmarkResolvedTsClientV2(b)
}

func BenchmarkResolvedTsClientV2WorkerPool(b *testing.B) {
hwm := regionWorkerHighWatermark
lwm := regionWorkerLowWatermark
regionWorkerHighWatermark = 10000
regionWorkerLowWatermark = 2000
defer func() {
regionWorkerHighWatermark = hwm
regionWorkerLowWatermark = lwm
}()
benchmarkResolvedTsClientV2(b)
}

func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) {
Expand Down Expand Up @@ -489,10 +511,32 @@ func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) {
}
}

func benchmarkMultiStoreResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
InitWorkerPool()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */)
}

func BenchmarkMultiStoreResolvedTsClientV1(b *testing.B) {
benchmarkMultipleStoreResolvedTs(b, false /* clientV1 */)
}

func BenchmarkMultiStoreResolvedTsClientV2(b *testing.B) {
benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */)
benchmarkMultiStoreResolvedTsClientV2(b)
}

func BenchmarkMultiStoreResolvedTsClientV2WorkerPool(b *testing.B) {
hwm := regionWorkerHighWatermark
lwm := regionWorkerLowWatermark
regionWorkerHighWatermark = 1000
regionWorkerLowWatermark = 200
defer func() {
regionWorkerHighWatermark = hwm
regionWorkerLowWatermark = lwm
}()
benchmarkMultiStoreResolvedTsClientV2(b)
}
Loading

0 comments on commit fc4adf4

Please sign in to comment.