Skip to content

Commit

Permalink
gc (ticdc): optimize the algorithm calculating gc safepoint (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 1, 2023
1 parent 566b9f1 commit be85fcc
Show file tree
Hide file tree
Showing 17 changed files with 141 additions and 111 deletions.
4 changes: 2 additions & 2 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestGetChangeFeed(t *testing.T) {
statusProvider.changefeedInfo = &model.ChangeFeedInfo{
ID: validID,
Error: &model.RunningError{
Code: string(cerrors.ErrGCTTLExceeded.RFCCode()),
Code: string(cerrors.ErrStartTsBeforeGC.RFCCode()),
},
}
statusProvider.changefeedStatus = &model.ChangeFeedStatus{
Expand All @@ -287,7 +287,7 @@ func TestGetChangeFeed(t *testing.T) {
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, resp.ID, validID)
require.Contains(t, resp.Error.Code, "ErrGCTTLExceeded")
require.Contains(t, resp.Error.Code, "ErrStartTsBeforeGC")

// success
statusProvider.changefeedInfo = &model.ChangeFeedInfo{ID: validID}
Expand Down
20 changes: 0 additions & 20 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,26 +458,6 @@ func TestFixState(t *testing.T) {
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(errors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(errors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrGCTTLExceeded.FastGen("InjectChangefeedFastFailError")
return cerror.ErrStartTsBeforeGC.FastGen("InjectChangefeedFastFailError")
})
if err := c.upstream.GCManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
Expand Down
25 changes: 20 additions & 5 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 90 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

Expand Down Expand Up @@ -73,8 +74,8 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor
// MaxElapsedTime=0 means the backoff never stops
f.errBackoff.MaxElapsedTime = 0
// backoff will stop once the defaultBackoffMaxElapsedTime has elapsed.
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)
Expand Down Expand Up @@ -135,6 +136,7 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adm
case model.StateError:
if m.state.Info.Error.IsChangefeedUnRetryableError() {
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}
}
Expand Down Expand Up @@ -536,12 +538,25 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
m.patchState(model.StateError)
} else {
oldBackoffInterval := m.backoffInterval
// NextBackOff will never return -1 because the backoff never stops
// with `MaxElapsedTime=0`
// ref: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L121-L123

m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// NextBackOff() will return -1 once the MaxElapsedTime has elapsed.
if m.backoffInterval == m.errBackoff.Stop {
log.Warn("The changefeed won't be restarted "+
"as it has been experiencing failures for "+
"an extended duration",
zap.Duration(
"maxElapsedTime",
m.errBackoff.MaxElapsedTime,
),
)
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}

log.Info("changefeed restart backoff interval is changed",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
Expand Down
48 changes: 26 additions & 22 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "CDC:ErrGCTTLExceeded",
Code: "CDC:ErrStartTsBeforeGC",
Message: "fake error for test",
}}, true, nil
})
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestHandleFastFailError(t *testing.T) {
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "CDC:ErrGCTTLExceeded",
Code: "CDC:ErrStartTsBeforeGC",
Message: "fake error for test",
}}, true, nil
})
Expand Down Expand Up @@ -565,33 +565,37 @@ func TestBackoffStopsUnexpectedly(t *testing.T) {
tester.MustApplyPatches()

for i := 1; i <= 10; i++ {
require.Equal(t, state.Info.State, model.StateNormal)
require.True(t, manager.ShouldRunning())
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrEtcdSessionDone]",
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
// after round 8, the maxElapsedTime of backoff will exceed 4000ms,
// and NextBackOff() will return -1, so the changefeed state will
// never turn into error state.
if i >= 8 {
require.True(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateNormal)
// after round 8, the maxElapsedTime of backoff will exceed 4000ms,
// and NextBackOff() will return -1, so the changefeed state will
// never turn into error state.
require.Equal(t, state.Info.State, model.StateFailed)
require.False(t, manager.ShouldRunning())
} else {
require.Equal(t, state.Info.State, model.StateNormal)
require.True(t, manager.ShouldRunning())
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (
*model.TaskPosition, bool, error,
) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrEtcdSessionDone]",
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
// If an error occurs, backing off from running the task.
require.False(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateError)
require.Equal(t, state.Info.AdminJobType, model.AdminStop)
require.Equal(t, state.Status.AdminJobType, model.AdminStop)
}
// 500ms is the backoff interval, so sleep 500ms and after a manager tick,
// the changefeed will turn into normal state

// 500ms is the backoff interval, so sleep 500ms and after a manager
// tick, the changefeed will turn into normal state
time.Sleep(500 * time.Millisecond)
manager.Tick(state)
tester.MustApplyPatches()
Expand Down
25 changes: 25 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,26 @@ func (o *ownerImpl) updateGCSafepoint(
return nil
}

// ignoreFailedChangeFeedWhenGC checks if a failed changefeed should be ignored
// when calculating the gc safepoint of the associated upstream.
func (o *ownerImpl) ignoreFailedChangeFeedWhenGC(
state *orchestrator.ChangefeedReactorState,
) bool {
upID := state.Info.UpstreamID
us, exist := o.upstreamManager.Get(upID)
if !exist {
log.Warn("upstream not found", zap.Uint64("ID", upID))
return false
}
// in case the changefeed failed right after it is created
// and the status is not initialized yet.
ts := state.Info.StartTs
if state.Status != nil {
ts = state.Status.CheckpointTs
}
return us.GCManager.IgnoreFailedChangeFeed(ts)
}

// calculateGCSafepoint calculates GCSafepoint for different upstream.
// Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster
// to prevent upstream TiDB GC from removing data that is still needed by TiCDC.
Expand All @@ -789,8 +809,13 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState)
if changefeedState.Info == nil {
continue
}

switch changefeedState.Info.State {
case model.StateNormal, model.StateStopped, model.StateError:
case model.StateFailed:
if o.ignoreFailedChangeFeedWhenGC(changefeedState) {
continue
}
default:
continue
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type mockManager struct {
func (m *mockManager) CheckStaleCheckpointTs(
ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts,
) error {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs()
return cerror.ErrStartTsBeforeGC.GenWithStackByArgs()
}

var _ gc.Manager = (*mockManager)(nil)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestCreateRemoveChangefeed(t *testing.T) {
Error: nil,
}

// this will make changefeed always meet ErrGCTTLExceeded
// this will make changefeed always meet ErrStartTsBeforeGC
up, _ := owner.upstreamManager.Get(changefeedInfo.UpstreamID)
mockedManager := &mockManager{Manager: up.GCManager}
up.GCManager = mockedManager
Expand Down
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,6 @@ error = '''
event is larger than the total memory quota, size: %d, quota: %d
'''

["CDC:ErrGCTTLExceeded"]
error = '''
the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded the GC TTL
'''

["CDC:ErrGRPCDialFailed"]
error = '''
grpc dial failed
Expand Down
5 changes: 0 additions & 5 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,11 +752,6 @@ var (
" caused by GC. checkpoint-ts %d is earlier than or equal to GC safepoint at %d",
errors.RFCCodeText("CDC:ErrSnapshotLostByGC"),
)
ErrGCTTLExceeded = errors.Normalize(
"the checkpoint-ts(%d) lag of the changefeed(%s) "+
"has exceeded the GC TTL",
errors.RFCCodeText("CDC:ErrGCTTLExceeded"),
)
ErrNotOwner = errors.Normalize(
"this capture is not a owner",
errors.RFCCodeText("CDC:ErrNotOwner"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func WrapError(rfcError *errors.Error, err error, args ...interface{}) error {
// wants to replicate has been or will be GC. So it makes no sense to try to
// resume the changefeed, and the changefeed should immediately be failed.
var changeFeedFastFailError = []*errors.Error{
ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC,
ErrSnapshotLostByGC, ErrStartTsBeforeGC,
}

// IsChangefeedFastFailError checks if an error is a ChangefeedFastFailError
Expand Down
17 changes: 1 addition & 16 deletions pkg/errors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,11 @@ func TestIsRetryableError(t *testing.T) {

func TestChangefeedFastFailError(t *testing.T) {
t.Parallel()
err := ErrGCTTLExceeded.FastGenByArgs()
err := ErrSnapshotLostByGC.FastGenByArgs()
rfcCode, _ := RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrGCTTLExceeded.GenWithStack("aa")
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrGCTTLExceeded.Wrap(errors.New("aa"))
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrSnapshotLostByGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrStartTsBeforeGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
Expand Down
Loading

0 comments on commit be85fcc

Please sign in to comment.