Skip to content

Commit

Permalink
*: fix changefeed checkpoint lag negative value error (#3013) (#3535)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 8, 2021
1 parent 632f53e commit 636c0f5
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 93 deletions.
36 changes: 24 additions & 12 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
Expand All @@ -54,12 +55,12 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool

cancel context.CancelFunc
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func(pd.Client) *owner.Owner
Expand Down Expand Up @@ -99,6 +100,12 @@ func (c *Capture) reset(ctx context.Context) error {
}
c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)

if c.grpcPool != nil {
c.grpcPool.Close()
}
Expand Down Expand Up @@ -147,11 +154,12 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
TimeAcquirer: c.TimeAcquirer,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -165,7 +173,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(3)
wg.Add(4)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -187,6 +195,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.TimeAcquirer.Run(ctx)
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
Expand Down
13 changes: 6 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package owner
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -178,7 +177,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
return errors.Trace(err)
}
if shouldUpdateState {
c.updateStatus(barrierTs)
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)
c.updateStatus(currentTs, barrierTs)
}
return nil
}
Expand Down Expand Up @@ -438,7 +439,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(barrierTs model.Ts) {
func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
resolvedTs := barrierTs
for _, position := range c.state.TaskPositions {
if resolvedTs > position.ResolvedTs {
Expand Down Expand Up @@ -470,12 +471,10 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) {
}
return status, changed, nil
})

phyTs := oracle.ExtractPhysical(checkpointTs)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
// It is more accurate to get tso from PD, but in most cases since we have
// deployed NTP service, a little bias is acceptable here.
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
}

func (c *changefeed) Close() {
Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
Expand Down Expand Up @@ -217,6 +218,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState)
if err := m.handleCommand(); err != nil {
return state, err
}

captureID := ctx.GlobalVars().CaptureInfo.ID
var inactiveChangefeedCount int
for changefeedID, changefeedState := range globalState.Changefeeds {
Expand Down
29 changes: 15 additions & 14 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"strconv"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -154,7 +153,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if !p.checkChangefeedNormal() {
return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs()
}
if skip := p.checkPosition(); skip {
// we should skip this tick after create a task position
if p.createTaskPosition() {
return p.changefeed, nil
}
if err := p.handleErrorCh(ctx); err != nil {
Expand All @@ -169,7 +169,12 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if err := p.checkTablesNum(ctx); err != nil {
return nil, errors.Trace(err)
}
p.handlePosition()

// it is no need to check the err here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()

p.handlePosition(oracle.GetPhysical(pdTime))
p.pushResolvedTs2Table()
p.handleWorkload()
p.doGCSchemaStorage(ctx)
Expand All @@ -187,10 +192,10 @@ func (p *processor) checkChangefeedNormal() bool {
return true
}

// checkPosition create a new task position, and put it into the etcd state.
// task position maybe be not exist only when the processor is running first time.
func (p *processor) checkPosition() (skipThisTick bool) {
if p.changefeed.TaskPositions[p.captureInfo.ID] != nil {
// createTaskPosition will create a new task position if a task position does not exist.
// task position not exist only when the processor is running first in the first tick.
func (p *processor) createTaskPosition() (skipThisTick bool) {
if _, exist := p.changefeed.TaskPositions[p.captureInfo.ID]; exist {
return false
}
if p.initialized {
Expand Down Expand Up @@ -552,7 +557,7 @@ func (p *processor) checkTablesNum(ctx cdcContext.Context) error {
}

// handlePosition calculates the local resolved ts and local checkpoint ts
func (p *processor) handlePosition() {
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
if p.schemaStorage != nil {
minResolvedTs = p.schemaStorage.ResolvedTs()
Expand All @@ -573,15 +578,11 @@ func (p *processor) handlePosition() {
}

resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-resolvedPhyTs) / 1e3)
p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3)
p.metricResolvedTsGauge.Set(float64(resolvedPhyTs))

checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-checkpointPhyTs) / 1e3)
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))

// minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new table added, the startTs of the new table is less than global checkpoint ts.
Expand Down
14 changes: 9 additions & 5 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"log"
"time"

"github.com/pingcap/ticdc/pkg/pdtime"

"github.com/pingcap/ticdc/pkg/version"

"github.com/pingcap/ticdc/cdc/kv"
Expand All @@ -33,11 +35,12 @@ import (
// the lifecycle of vars in the GlobalVars should be aligned with the ticdc server process.
// All field in Vars should be READ-ONLY and THREAD-SAFE
type GlobalVars struct {
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *kv.CDCEtcdClient
GrpcPool kv.GrpcPool
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *kv.CDCEtcdClient
GrpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
}

// ChangefeedVars contains some vars which can be used anywhere in a pipeline
Expand Down Expand Up @@ -184,6 +187,7 @@ func NewBackendContext4Test(withChangefeedVars bool) Context {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
if withChangefeedVars {
ctx = WithChangefeedVars(ctx, &ChangefeedVars{
Expand Down
6 changes: 4 additions & 2 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -224,7 +225,6 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
defer func() {
_ = cli.Unwrap().Close()
}()

_, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0")
c.Check(err, check.IsNil)

Expand Down Expand Up @@ -273,7 +273,9 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
}

err = errg.Wait()
if err != nil && (errors.Cause(err) == context.DeadlineExceeded || errors.Cause(err) == context.Canceled) {
if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
errors.Cause(err) == context.Canceled ||
strings.Contains(err.Error(), "etcdserver: request timeout")) {
return
}
c.Check(err, check.IsNil)
Expand Down
Loading

0 comments on commit 636c0f5

Please sign in to comment.