Skip to content

Commit

Permalink
Merge branch 'master' into dongmen/bidirectional_replicating
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 30, 2022
2 parents 6582b3f + 197bc0e commit 251c56b
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 26 deletions.
17 changes: 9 additions & 8 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
// newSchedulerFromCtx creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
ctx cdcContext.Context, startTs uint64,
ctx cdcContext.Context, pdClock pdutil.Clock,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
Expand All @@ -51,15 +51,16 @@ func newSchedulerFromCtx(
captureID := ctx.GlobalVars().CaptureInfo.ID
cfg := config.GetGlobalServerConfig().Debug
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID, startTs,
messageServer, messageRouter, ownerRev, cfg.Scheduler)
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, cfg.Scheduler, pdClock)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context, startTs uint64,
ctx cdcContext.Context,
pdClock pdutil.Clock,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, startTs)
return newSchedulerFromCtx(ctx, pdClock)
}

type changefeed struct {
Expand Down Expand Up @@ -126,7 +127,7 @@ type changefeed struct {
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error)
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand Down Expand Up @@ -164,7 +165,7 @@ func newChangefeed4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error),
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
) *changefeed {
c := newChangefeed(id, state, up)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -539,7 +540,7 @@ LOOP:
zap.String("changefeed", c.id.ID))

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
c.scheduler, err = c.newScheduler(ctx, c.upstream.PDClock)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -215,7 +216,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
},
// new scheduler
func(
ctx cdcContext.Context, startTs uint64,
ctx cdcContext.Context, pdClock pdutil.Clock,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
})
Expand Down
5 changes: 3 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newOwner4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error),
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
pdClient pd.Client,
) Owner {
m := upstream.NewManager4Test(pdClient)
Expand Down Expand Up @@ -101,7 +102,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
},
// new scheduler
func(
ctx cdcContext.Context, startTs uint64,
ctx cdcContext.Context, pdClock pdutil.Clock,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
},
Expand Down
22 changes: 18 additions & 4 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)
Expand All @@ -53,6 +54,7 @@ type coordinator struct {
replicationM *replication.Manager
captureM *member.CaptureManager
schedulerM *scheduler.Manager
pdClock pdutil.Clock

lastCollectTime time.Time
changefeedID model.ChangeFeedID
Expand All @@ -63,11 +65,11 @@ func NewCoordinator(
ctx context.Context,
captureID model.CaptureID,
changefeedID model.ChangeFeedID,
checkpointTs model.Ts,
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
pdClock pdutil.Clock,
) (internal.Scheduler, error) {
trans, err := transport.NewTransport(
ctx, changefeedID, transport.SchedulerRole, messageServer, messageRouter)
Expand All @@ -76,6 +78,7 @@ func NewCoordinator(
}
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
coord.pdClock = pdClock
return coord, nil
}

Expand Down Expand Up @@ -236,7 +239,9 @@ func (c *coordinator) Close(ctx context.Context) {
// ===========

func (c *coordinator) poll(
ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID,
ctx context.Context,
checkpointTs model.Ts,
currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
c.maybeCollectMetrics()
Expand All @@ -260,10 +265,19 @@ func (c *coordinator) poll(
}
msgBuf = append(msgBuf, msgs...)

pdTime := time.Now()
// only nil in unit test
if c.pdClock != nil {
pdTime, err = c.pdClock.CurrentTime()
if err != nil {
log.Warn("schedulerv3: failed to get pd time", zap.Error(err))
}
}

if !c.captureM.CheckAllCaptureInitialized() {
// Skip generating schedule tasks for replication manager,
// as not all capture are initialized.
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables)
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables, pdTime)
return newCheckpointTs, newResolvedTs, c.sendMsgs(ctx, msgBuf)
}

Expand Down Expand Up @@ -297,7 +311,7 @@ func (c *coordinator) poll(
}

// Checkpoint calculation
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables)
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables, pdTime)
return newCheckpointTs, newResolvedTs, nil
}

Expand Down
66 changes: 62 additions & 4 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package replication

import (
"container/heap"
"math"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -28,6 +30,10 @@ import (

const (
checkpointCannotProceed = internal.CheckpointCannotProceed

defaultSlowTableHeapSize = 4
logSlowTablesLagThreshold = 30 * time.Second
logSlowTablesInterval = 1 * time.Minute
)

// Callback is invoked when something is done.
Expand Down Expand Up @@ -93,21 +99,29 @@ type Manager struct { //nolint:revive

changefeedID model.ChangeFeedID
slowestTableID model.TableID
slowTableHeap SetHeap
acceptAddTableTask int
acceptRemoveTableTask int
acceptMoveTableTask int
acceptBurstBalanceTask int

lastLogSlowTablesTime time.Time
}

// NewReplicationManager returns a new replication manager.
func NewReplicationManager(
maxTaskConcurrency int, changefeedID model.ChangeFeedID,
) *Manager {
slowTableHeap := make(SetHeap, 0, defaultSlowTableHeapSize)
heap.Init(&slowTableHeap)

return &Manager{
tables: make(map[int64]*ReplicationSet),
runningTasks: make(map[int64]*ScheduleTask),
maxTaskConcurrency: maxTaskConcurrency,
changefeedID: changefeedID,
tables: make(map[int64]*ReplicationSet),
runningTasks: make(map[int64]*ScheduleTask),
maxTaskConcurrency: maxTaskConcurrency,
changefeedID: changefeedID,
slowTableHeap: slowTableHeap,
lastLogSlowTablesTime: time.Now(),
}
}

Expand Down Expand Up @@ -474,6 +488,7 @@ func (r *Manager) RunningTasks() map[model.TableID]*ScheduleTask {
// AdvanceCheckpoint tries to advance checkpoint and returns current checkpoint.
func (r *Manager) AdvanceCheckpoint(
currentTables []model.TableID,
currentTime time.Time,
) (newCheckpointTs, newResolvedTs model.Ts) {
newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
slowestTableID := int64(0)
Expand All @@ -499,9 +514,52 @@ func (r *Manager) AdvanceCheckpoint(
if slowestTableID != 0 {
r.slowestTableID = slowestTableID
}

// If changefeed's checkpoint lag is larger than 30s,
// log the 4 slowlest table infos every minute, which can
// help us find the problematic tables.
checkpointLag := currentTime.Sub(oracle.GetTimeFromTS(newCheckpointTs))
if checkpointLag > logSlowTablesLagThreshold &&
time.Since(r.lastLogSlowTablesTime) > logSlowTablesInterval {
r.logSlowTableInfo(currentTables, currentTime)
r.lastLogSlowTablesTime = time.Now()
}

return newCheckpointTs, newResolvedTs
}

func (r *Manager) logSlowTableInfo(currentTables []model.TableID, currentTime time.Time) {
// find the slow tables
for _, tableID := range currentTables {
table, ok := r.tables[tableID]
if !ok {
continue
}
lag := currentTime.Sub(oracle.GetTimeFromTS(table.Checkpoint.CheckpointTs))
if lag < logSlowTablesLagThreshold {
continue
}
heap.Push(&r.slowTableHeap, table)
if r.slowTableHeap.Len() > defaultSlowTableHeapSize {
heap.Pop(&r.slowTableHeap)
}
}

num := r.slowTableHeap.Len()
for i := 0; i < num; i++ {
table := heap.Pop(&r.slowTableHeap).(*ReplicationSet)
log.Info("schedulerv3: slow table",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID),
zap.Int64("tableID", table.TableID),
zap.String("tableStatus", table.Stats.String()),
zap.Uint64("checkpointTs", table.Checkpoint.CheckpointTs),
zap.Uint64("resolvedTs", table.Checkpoint.ResolvedTs),
zap.Duration("checkpointLag", currentTime.
Sub(oracle.GetTimeFromTS(table.Checkpoint.CheckpointTs))))
}
}

// CollectMetrics collects metrics.
func (r *Manager) CollectMetrics() {
cf := r.changefeedID
Expand Down
Loading

0 comments on commit 251c56b

Please sign in to comment.