Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: don't resolve lock immediately after a region is initialized #2235

Merged
merged 2 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}

func (w *regionWorker) resolveLock(ctx context.Context) error {
// tikv resolved update interval is 1s, use half of the resolck lock interval
// as lock penalty.
resolveLockPenalty := 10
resolveLockInterval := 20 * time.Second
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
resolveLockInterval = time.Duration(val.(int)) * time.Second
Expand Down Expand Up @@ -304,6 +307,17 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.Duration("duration", sinceLastResolvedTs), zap.Duration("since last event", sinceLastResolvedTs))
return errReconnect
}
// Only resolve lock if the resovled-ts keeps unchanged for
// more than resolveLockPenalty times.
if rts.ts.penalty < resolveLockPenalty {
if lastResolvedTs > rts.ts.resolvedTs {
rts.ts.resolvedTs = lastResolvedTs
rts.ts.eventTime = time.Now()
rts.ts.penalty = 0
Comment on lines +314 to +316
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you reset the penalty in func Upsert()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upsert here is a insert operation actually, so we must reset penalty outside

}
w.rtsManager.Upsert(rts)
continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
Expand Down Expand Up @@ -586,13 +600,6 @@ func (w *regionWorker) handleEventEntry(
}
w.metrics.metricPullEventInitializedCounter.Inc()

select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(state.sri.ts)}:
default:
// rtsUpdateCh block often means too many regions are suffering
// lock resolve, the kv client status is not very healthy.
log.Warn("region is not upsert into rts manager", zap.Uint64("region-id", regionID))
}
state.initialized = true
w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
Expand Down
8 changes: 7 additions & 1 deletion cdc/kv/resolvedts_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type tsItem struct {
sortByEvTime bool
resolvedTs uint64
eventTime time.Time
penalty int
}

func newResolvedTsItem(ts uint64) tsItem {
Expand Down Expand Up @@ -92,9 +93,14 @@ func (rm *regionTsManager) Upsert(item *regionTsInfo) {
if old, ok := rm.m[item.regionID]; ok {
// in a single resolved ts manager, the resolved ts of a region should not be fallen back
if !item.ts.sortByEvTime {
if item.ts.resolvedTs > old.ts.resolvedTs || item.ts.eventTime.After(old.ts.eventTime) {
if item.ts.resolvedTs == old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
} else if item.ts.resolvedTs > old.ts.resolvedTs {
old.ts.resolvedTs = item.ts.resolvedTs
old.ts.eventTime = item.ts.eventTime
old.ts.penalty = 0
heap.Fix(&rm.h, old.index)
}
} else {
Expand Down
29 changes: 29 additions & 0 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) {
c.Assert(rts, check.IsNil)
}

func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
initRegions := []*regionTsInfo{
{regionID: 100, ts: newResolvedTsItem(1000)},
}
for _, rts := range initRegions {
mgr.Upsert(rts)
}
c.Assert(mgr.Len(), check.Equals, 1)

// test penalty increases if resolved ts keeps unchanged
for i := 0; i < 6; i++ {
rts := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(1000)}
mgr.Upsert(rts)
}
rts := mgr.Pop()
c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000))
c.Assert(rts.ts.penalty, check.Equals, 6)

// test penalty is cleared to zero if resolved ts is advanced
mgr.Upsert(rts)
rtsNew := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)}
mgr.Upsert(rtsNew)
rts = mgr.Pop()
c.Assert(rts.ts.penalty, check.DeepEquals, 0)
c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000))
}

func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
Expand Down