Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: Modified the update strategy of gcSafePoint (#1731) #1759

Merged
merged 6 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 84 additions & 8 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ func (o *ownership) inc() {
}
}

type minGCSafePointCacheEntry struct {
ts model.Ts
lastUpdated time.Time
}

func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts {
if time.Now().After(o.minGCSafePointCache.lastUpdated.Add(MinGCSafePointCacheUpdateInterval)) {
physicalTs, logicalTs, err := o.pdClient.GetTS(ctx)
if err != nil {
log.Warn("Fail to update minGCSafePointCache.", zap.Error(err))
return o.minGCSafePointCache.ts
}
o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs)
o.minGCSafePointCache.lastUpdated = time.Now()
}
return o.minGCSafePointCache.ts
}

// Owner manages the cdc cluster
type Owner struct {
done chan struct{}
Expand Down Expand Up @@ -107,6 +125,8 @@ type Owner struct {
gcTTL int64
// last update gc safepoint time. zero time means has not updated or cleared
gcSafepointLastUpdate time.Time
// stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval.
minGCSafePointCache minGCSafePointCacheEntry
// record last time that flushes all changefeeds' replication status
lastFlushChangefeeds time.Time
flushChangefeedInterval time.Duration
Expand All @@ -118,6 +138,8 @@ const (
CDCServiceSafePointID = "ticdc"
// GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint
GCSafepointUpdateInterval = time.Duration(2 * time.Second)
// MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache
MinGCSafePointCacheUpdateInterval = time.Second * 2
)

// NewOwner creates a new Owner instance
Expand Down Expand Up @@ -718,13 +740,26 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
return nil
}

minCheckpointTs := uint64(math.MaxUint64)
staleChangeFeeds := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
gcSafePoint := uint64(math.MaxUint64)

// get the lower bound of gcSafePoint
minGCSafePoint := o.getMinGCSafePointCache(ctx)

if len(o.changeFeeds) > 0 {
snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
for id, changefeed := range o.changeFeeds {
snapshot[id] = changefeed.status
if changefeed.appliedCheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.appliedCheckpointTs
if changefeed.status.CheckpointTs < gcSafePoint {
gcSafePoint = changefeed.status.CheckpointTs
}
// If changefeed's appliedCheckpoinTs < minGCSafePoint, it means this changefeed is stagnant.
// They are collected into this map, and then handleStaleChangeFeed() is called to deal with these stagnant changefeed.
// A changefeed will not enter the map twice, because in run(),
// handleAdminJob() will always be executed before flushChangeFeedInfos(),
// ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds.
if changefeed.status.CheckpointTs < minGCSafePoint {
staleChangeFeeds[id] = changefeed.status
}

phyTs := oracle.ExtractPhysical(changefeed.status.CheckpointTs)
Expand All @@ -744,13 +779,28 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
o.lastFlushChangefeeds = time.Now()
}
}

for _, status := range o.stoppedFeeds {
if status.CheckpointTs < minCheckpointTs {
minCheckpointTs = status.CheckpointTs
// If a stopped changefeed's CheckpoinTs < minGCSafePoint, means this changefeed is stagnant.
// It should never be resumed. This part of the logic is in newChangeFeed()
// So here we can skip it.
if status.CheckpointTs < minGCSafePoint {
continue
}

if status.CheckpointTs < gcSafePoint {
gcSafePoint = status.CheckpointTs
}
}

// handle stagnant changefeed collected above
err := o.handleStaleChangeFeed(ctx, staleChangeFeeds, minGCSafePoint)
if err != nil {
log.Warn("failed to handleStaleChangeFeed ", zap.Error(err))
}

if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval {
actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, gcSafePoint)
if err != nil {
sinceLastUpdate := time.Since(o.gcSafepointLastUpdate)
log.Warn("failed to update service safe point", zap.Error(err),
Expand All @@ -767,9 +817,9 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
actual = uint64(val.(int))
})

if actual > minCheckpointTs {
if actual > gcSafePoint {
// UpdateServiceGCSafePoint has failed.
log.Warn("updating an outdated service safe point", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("actual-safepoint", actual))
log.Warn("updating an outdated service safe point", zap.Uint64("checkpoint-ts", gcSafePoint), zap.Uint64("actual-safepoint", actual))

for cfID, cf := range o.changeFeeds {
if cf.status.CheckpointTs < actual {
Expand Down Expand Up @@ -874,6 +924,7 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
// For `AdminResume`, we remove stopped feed in changefeed initialization phase.
// For `AdminRemove`, we need to update stoppedFeeds when removing a stopped changefeed.
if job.Type == model.AdminStop {
log.Debug("put changfeed into stoppedFeeds queue", zap.String("changefeed", job.CfID))
o.stoppedFeeds[job.CfID] = cf.status
}
for captureID := range cf.taskStatus {
Expand Down Expand Up @@ -1626,3 +1677,28 @@ func (o *Owner) startCaptureWatcher(ctx context.Context) {
}
}()
}

// handle the StaleChangeFeed
// By setting the AdminJob type to AdminStop and the Error code to indicate that the changefeed is stagnant.
func (o *Owner) handleStaleChangeFeed(ctx context.Context, staleChangeFeeds map[model.ChangeFeedID]*model.ChangeFeedStatus, minGCSafePoint uint64) error {
for id, status := range staleChangeFeeds {
message := cerror.ErrSnapshotLostByGC.GenWithStackByArgs(status.CheckpointTs, minGCSafePoint).Error()
log.Warn("changefeed checkpoint is lagging too much, so it will be stopped.", zap.String("changefeed", id), zap.String("Error message", message))
runningError := &model.RunningError{
Addr: util.CaptureAddrFromCtx(ctx),
Code: string(cerror.ErrSnapshotLostByGC.RFCCode()), // changfeed is stagnant
Message: message,
}

err := o.EnqueueJob(model.AdminJob{
CfID: id,
Type: model.AdminStop,
Error: runningError,
})
if err != nil {
return errors.Trace(err)
}
delete(staleChangeFeeds, id)
}
return nil
}
117 changes: 116 additions & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -91,6 +92,16 @@ type mockPDClient struct {
mockPDFailure bool
}

func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
if m.mockPDFailure {
return 0, 0, errors.New("injected PD failure")
}
if m.mockSafePointLost {
return 0, 0, nil
}
return oracle.GetPhysical(time.Now()), 0, nil
}

func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
m.invokeCounter++

Expand Down Expand Up @@ -175,12 +186,114 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) {
s.TearDownTest(c)
}

// Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint.
// If a changefeed is put into the stop queue due to stagnation, it can no longer affect the update of gcSafePoint.
// So we just need to test whether the stagnant changefeed is put into the stop queue.
func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{}
changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 1000,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
"test_change_feed_2": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: oracle.EncodeTSO(oracle.GetPhysical(time.Now())),
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
"test_change_feed_3": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 0,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
session: session,
pdClient: mockPDCli,
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
// gcSafepointLastUpdate: time.Now(),
gcTTL: 6, // 6 seconds
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
minGCSafePointCache: minGCSafePointCacheEntry{},
}

time.Sleep(3 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_3"], check.NotNil)
c.Assert(mockOwner.changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal)

time.Sleep(6 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 2)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_2"], check.NotNil)

s.TearDownTest(c)
}

func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{
mockSafePointLost: true,
}

changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
Expand Down Expand Up @@ -221,6 +334,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
pdClient: mockPDCli,
session: session,
Expand All @@ -230,6 +344,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
minGCSafePointCache: minGCSafePointCacheEntry{},
}

err = mockOwner.flushChangeFeedInfos(s.ctx)
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,11 @@ error = '''
sink uri invalid
'''

["CDC:ErrSnapshotLostByGC"]
error = '''
fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts %d is earlier than GC safepoint at %d
'''

["CDC:ErrSnapshotSchemaExists"]
error = '''
schema %s(%d) already exists
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ var (
ErrServiceSafepointLost = errors.Normalize("service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrServiceSafepointLost"))
ErrUpdateServiceSafepointFailed = errors.Normalize("updating service safepoint failed", errors.RFCCodeText("CDC:ErrUpdateServiceSafepointFailed"))
ErrStartTsBeforeGC = errors.Normalize("fail to create changefeed because start-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrStartTsBeforeGC"))

ErrSnapshotLostByGC = errors.Normalize("fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrSnapshotLostByGC"))
// EtcdWorker related errors. Internal use only.
// ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort.
ErrEtcdTryAgain = errors.Normalize("the etcd txn should be aborted and retried immediately", errors.RFCCodeText("CDC:ErrEtcdTryAgain"))
Expand Down