Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-2308-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 19, 2021
2 parents 1d7ffb7 + 03f9f2f commit c9c8580
Show file tree
Hide file tree
Showing 15 changed files with 101 additions and 578 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
linters:
enable:
- unconvert
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ tools/bin/errdoc-gen: tools/check/go.mod
cd tools/check; test -e ../bin/errdoc-gen || \
$(GO) build -o ../bin/errdoc-gen github.com/pingcap/errors/errdoc-gen

tools/bin/golangci-lint: tools/check/go.mod
tools/bin/golangci-lint:
cd tools/check; test -e ../bin/golangci-lint || \
$(GO) build -o ../bin/golangci-lint github.com/golangci/golangci-lint/cmd/golangci-lint
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b ../bin v1.30.0

failpoint-enable: check_failpoint_ctl
$(FAILPOINT_ENABLE)
Expand Down
21 changes: 14 additions & 7 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}

func (w *regionWorker) resolveLock(ctx context.Context) error {
// tikv resolved update interval is 1s, use half of the resolck lock interval
// as lock penalty.
resolveLockPenalty := 10
resolveLockInterval := 20 * time.Second
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
resolveLockInterval = time.Duration(val.(int)) * time.Second
Expand Down Expand Up @@ -300,6 +303,17 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.Duration("duration", sinceLastResolvedTs), zap.Duration("since last event", sinceLastResolvedTs))
return errReconnect
}
// Only resolve lock if the resovled-ts keeps unchanged for
// more than resolveLockPenalty times.
if rts.ts.penalty < resolveLockPenalty {
if lastResolvedTs > rts.ts.resolvedTs {
rts.ts.resolvedTs = lastResolvedTs
rts.ts.eventTime = time.Now()
rts.ts.penalty = 0
}
w.rtsManager.Upsert(rts)
continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
Expand Down Expand Up @@ -582,13 +596,6 @@ func (w *regionWorker) handleEventEntry(
}
w.metrics.metricPullEventInitializedCounter.Inc()

select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(state.sri.ts)}:
default:
// rtsUpdateCh block often means too many regions are suffering
// lock resolve, the kv client status is not very healthy.
log.Warn("region is not upsert into rts manager", zap.Uint64("region-id", regionID))
}
state.initialized = true
w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
Expand Down
8 changes: 7 additions & 1 deletion cdc/kv/resolvedts_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type tsItem struct {
sortByEvTime bool
resolvedTs uint64
eventTime time.Time
penalty int
}

func newResolvedTsItem(ts uint64) tsItem {
Expand Down Expand Up @@ -92,9 +93,14 @@ func (rm *regionTsManager) Upsert(item *regionTsInfo) {
if old, ok := rm.m[item.regionID]; ok {
// in a single resolved ts manager, the resolved ts of a region should not be fallen back
if !item.ts.sortByEvTime {
if item.ts.resolvedTs > old.ts.resolvedTs || item.ts.eventTime.After(old.ts.eventTime) {
if item.ts.resolvedTs == old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
} else if item.ts.resolvedTs > old.ts.resolvedTs {
old.ts.resolvedTs = item.ts.resolvedTs
old.ts.eventTime = item.ts.eventTime
old.ts.penalty = 0
heap.Fix(&rm.h, old.index)
}
} else {
Expand Down
29 changes: 29 additions & 0 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) {
c.Assert(rts, check.IsNil)
}

func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
initRegions := []*regionTsInfo{
{regionID: 100, ts: newResolvedTsItem(1000)},
}
for _, rts := range initRegions {
mgr.Upsert(rts)
}
c.Assert(mgr.Len(), check.Equals, 1)

// test penalty increases if resolved ts keeps unchanged
for i := 0; i < 6; i++ {
rts := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(1000)}
mgr.Upsert(rts)
}
rts := mgr.Pop()
c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000))
c.Assert(rts.ts.penalty, check.Equals, 6)

// test penalty is cleared to zero if resolved ts is advanced
mgr.Upsert(rts)
rtsNew := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)}
mgr.Upsert(rtsNew)
rts = mgr.Pop()
c.Assert(rts.ts.penalty, check.DeepEquals, 0)
c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000))
}

func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (info *ChangeFeedInfo) Unmarshal(data []byte) error {
return errors.Annotatef(
cerror.WrapError(cerror.ErrMarshalFailed, err), "Marshal data: %v", data)
}
info.Opts[mark.OptCyclicConfig] = string(cyclicCfg)
info.Opts[mark.OptCyclicConfig] = cyclicCfg
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,19 @@ func ColumnValueString(c interface{}) string {
case int32:
data = strconv.FormatInt(int64(v), 10)
case int64:
data = strconv.FormatInt(int64(v), 10)
data = strconv.FormatInt(v, 10)
case uint8:
data = strconv.FormatUint(uint64(v), 10)
case uint16:
data = strconv.FormatUint(uint64(v), 10)
case uint32:
data = strconv.FormatUint(uint64(v), 10)
case uint64:
data = strconv.FormatUint(uint64(v), 10)
data = strconv.FormatUint(v, 10)
case float32:
data = strconv.FormatFloat(float64(v), 'f', -1, 32)
case float64:
data = strconv.FormatFloat(float64(v), 'f', -1, 64)
data = strconv.FormatFloat(v, 'f', -1, 64)
case string:
data = v
case []byte:
Expand Down
4 changes: 3 additions & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const (
// CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint.
CDCServiceSafePointID = "ticdc"
// GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint
GCSafepointUpdateInterval = time.Duration(2 * time.Second)
GCSafepointUpdateInterval = 2 * time.Second
// MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache
MinGCSafePointCacheUpdateInterval = time.Second * 2
)
Expand Down Expand Up @@ -977,6 +977,8 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
ownerMaintainTableNumGauge.DeleteLabelValues(cf.id, capture.AdvertiseAddr, maintainTableTypeWip)
}
delete(o.changeFeeds, job.CfID)
changefeedCheckpointTsGauge.DeleteLabelValues(cf.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(cf.id)
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,12 @@ func (p *oldProcessor) stop(ctx context.Context) error {
log.Warn("an error occurred when stopping the processor", zap.Error(err))
errRes = err
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Set(0)
resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
return errRes
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if err != nil {
return errors.Trace(err)
}
opts[mark.OptCyclicConfig] = string(cyclicCfg)
opts[mark.OptCyclicConfig] = cyclicCfg
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
Expand Down
2 changes: 1 addition & 1 deletion cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *Server) etcdHealthChecker(ctx context.Context) error {
case <-ticker.C:
for _, pdEndpoint := range s.pdEndpoints {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Duration(time.Second*10))
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
req, err := http.NewRequestWithContext(
ctx, http.MethodGet, fmt.Sprintf("%s/health", pdEndpoint), nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ func verifyTables(ctx context.Context, credential *security.Credential, cfg *con
}

for tID, tableName := range snap.CloneTables() {
tableInfo, exist := snap.TableByID(int64(tID))
tableInfo, exist := snap.TableByID(tID)
if !exist {
return nil, nil, errors.NotFoundf("table %d", int64(tID))
return nil, nil, errors.NotFoundf("table %d", tID)
}
if filter.ShouldIgnoreTable(tableName.Schema, tableName.Table) {
continue
Expand Down
7 changes: 5 additions & 2 deletions pkg/retry/retry_with_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration
if temp <= 0 {
temp = 1
}
sleep := temp + rand.Int63n(temp)
backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs)
sleep := (temp + rand.Int63n(temp)) * 3
if sleep <= 0 {
sleep = math.MaxInt64
}
backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep))+backoffBaseInMs)
return time.Duration(backOff) * time.Millisecond
}
1 change: 0 additions & 1 deletion tools/check/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/pingcap/tidb-cdc/_tools
go 1.13

require (
github.com/golangci/golangci-lint v1.33.0
github.com/mgechev/revive v1.0.2
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
mvdan.cc/gofumpt v0.0.0-20201123090407-3077abae40c0
Expand Down
Loading

0 comments on commit c9c8580

Please sign in to comment.