From 1fda53b2fb128b6858ee4a09d346007dd2f656be Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sun, 8 Oct 2023 18:49:05 +0800 Subject: [PATCH 1/6] some fix to the scheduler. --- cdc/processor/processor.go | 2 +- .../v3/replication/replication_manager.go | 58 ++++++++++++++++++- .../internal/v3/scheduler/scheduler_basic.go | 4 +- .../v3/scheduler/scheduler_manager.go | 2 +- 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index af589634062..1e82c55a28c 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -767,7 +767,7 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str retry.WithIsRetryableErr(cerror.IsRetryableError)) if tableName == nil { - log.Warn("failed to get table name for metric") + log.Warn("failed to get table name for metric", zap.Any("tableID", tableID)) return strconv.Itoa(int(tableID)) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 9a33d460bbf..72fc6e98731 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -16,6 +16,7 @@ package replication import ( "bytes" "container/heap" + "fmt" "math" "time" @@ -51,12 +52,30 @@ type BurstBalance struct { MoveTables []MoveTable } +func (b BurstBalance) String() string { + if len(b.AddTables) != 0 { + return fmt.Sprintf("BurstBalance, add tables: %v", b.AddTables) + } + if len(b.RemoveTables) != 0 { + return fmt.Sprintf("BurstBalance, remove tables: %v", b.RemoveTables) + } + if len(b.MoveTables) != 0 { + return fmt.Sprintf("BurstBalance, move tables: %v", b.MoveTables) + } + return "BurstBalance, no tables" +} + // MoveTable is a schedule task for moving a table. type MoveTable struct { Span tablepb.Span DestCapture model.CaptureID } +func (t MoveTable) String() string { + return fmt.Sprintf("MoveTable, span: %s, dest: %s", + t.Span.String(), t.DestCapture) +} + // AddTable is a schedule task for adding a table. type AddTable struct { Span tablepb.Span @@ -64,12 +83,22 @@ type AddTable struct { CheckpointTs model.Ts } +func (t AddTable) String() string { + return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d", + t.Span.String(), t.CaptureID, t.CheckpointTs) +} + // RemoveTable is a schedule task for removing a table. type RemoveTable struct { Span tablepb.Span CaptureID model.CaptureID } +func (t RemoveTable) String() string { + return fmt.Sprintf("RemoveTable, span: %s, capture: %s", + t.Span.String(), t.CaptureID) +} + // ScheduleTask is a schedule task that wraps add/move/remove table tasks. type ScheduleTask struct { //nolint:revive MoveTable *MoveTable @@ -94,6 +123,33 @@ func (s *ScheduleTask) Name() string { return "unknown" } +func (s *ScheduleTask) String() string { + if s.MoveTable != nil { + return s.MoveTable.String() + } + if s.AddTable != nil { + return s.AddTable.String() + } + if s.RemoveTable != nil { + return s.RemoveTable.String() + } + if s.BurstBalance != nil { + return s.BurstBalance.String() + } + return "" +} + +type ScheduleTasks []*ScheduleTask + +func (s ScheduleTasks) String() string { + var buf bytes.Buffer + for _, task := range s { + buf.WriteString(task.String()) + buf.WriteString(";") + } + return buf.String() +} + // Manager manages replications and running scheduling tasks. type Manager struct { //nolint:revive spans *spanz.BtreeMap[*ReplicationSet] @@ -687,7 +743,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) { zap.String("namespace", r.changefeedID.Namespace), zap.String("changefeed", r.changefeedID.ID), zap.Int64("tableID", table.Span.TableID), - zap.String("tableStatus", table.Stats.String()), + zap.String("tableStatus", table.State.String()), zap.Uint64("checkpointTs", table.Checkpoint.CheckpointTs), zap.Uint64("resolvedTs", table.Checkpoint.ResolvedTs), zap.Duration("checkpointLag", currentPDTime. diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go index f98ca68a0be..35137cff614 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go @@ -116,7 +116,7 @@ func (b *basicScheduler) Schedule( // Fast path for check whether two sets are identical: // If the length of currentTables and replications are equal, // and for all tables in currentTables have a record in replications. - if !tablesLenEqual || !tablesAllFind { + if !(tablesLenEqual && tablesAllFind) { // The two sets are not identical. We need to find removed tables. intersectionTable := spanz.NewBtreeMap[struct{}]() for _, span := range currentSpans { @@ -138,7 +138,7 @@ func (b *basicScheduler) Schedule( log.Info("schedulerv3: burst remove table", zap.String("namespace", b.changefeedID.Namespace), zap.String("changefeed", b.changefeedID.ID), - zap.Int("tableCount", len(newSpans))) + zap.Int("tableCount", len(rmSpans))) tasks = append(tasks, newBurstRemoveTables(rmSpans, replications, b.changefeedID)) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go index f45fd6100c9..db1554d58b4 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go @@ -92,7 +92,7 @@ func (sm *Manager) Schedule( zap.String("namespace", sm.changefeedID.Namespace), zap.String("changefeed", sm.changefeedID.ID), zap.Int("taskNumber", len(tasks)), - zap.Any("task", tasks), + zap.Stringer("task", replication.ScheduleTasks(tasks)), zap.String("scheduler", scheduler.Name())) return tasks } From be68b686c4ba3bc7472ef0c761f20bed424b7d95 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sun, 8 Oct 2023 19:14:50 +0800 Subject: [PATCH 2/6] adjust p2p client logs. --- pkg/p2p/grpc_client.go | 5 ----- pkg/p2p/message_router.go | 15 ++++++++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/p2p/grpc_client.go b/pkg/p2p/grpc_client.go index 76a03764625..a926d11b185 100644 --- a/pkg/p2p/grpc_client.go +++ b/pkg/p2p/grpc_client.go @@ -91,11 +91,6 @@ func (c *grpcMessageClient) Run( defer func() { c.isClosed.Store(true) close(c.closeCh) - - log.Info("peer message client exited", - zap.String("addr", addr), - zap.String("captureID", receiverID), - zap.Error(ret)) }() metricsClientCount := clientCount.With(prometheus.Labels{ diff --git a/pkg/p2p/message_router.go b/pkg/p2p/message_router.go index 8c6b931a08a..6178bb73e23 100644 --- a/pkg/p2p/message_router.go +++ b/pkg/p2p/message_router.go @@ -176,11 +176,16 @@ func (m *messageRouterImpl) GetClient(target NodeID) MessageClient { defer m.wg.Done() defer cancel() err := client.Run(ctx, "tcp", addr, target, m.credentials) - log.Warn("p2p client exited with error", - zap.String("addr", addr), - zap.String("targetCapture", target), - zap.Error(err)) - + if err != nil { + log.Warn("p2p client exited with error", + zap.String("addr", addr), + zap.String("targetCapture", target), + zap.Error(err)) + } else { + log.Info("peer message client exited", + zap.String("addr", addr), + zap.String("targetCapture", target)) + } if errors.Cause(err) != context.Canceled { // Send the error to the error channel. select { From e28a09f9f13fa302a53b593e68d8c5f6f5b2873a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sun, 8 Oct 2023 21:33:50 +0800 Subject: [PATCH 3/6] adjust logs in the scheduler. --- .../v3/replication/replication_manager.go | 2 +- .../v3/replication/replication_set.go | 39 ++++++++++++------- .../internal/v3/scheduler/scheduler_basic.go | 39 +++++++++++-------- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 72fc6e98731..01dcfc77f5a 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -351,7 +351,7 @@ func (r *Manager) HandleTasks( tasks []*ScheduleTask, ) ([]*schedulepb.Message, error) { // Check if a running task is finished. - toBeDeleted := []tablepb.Span{} + var toBeDeleted []tablepb.Span r.runningTasks.Ascend(func(span tablepb.Span, task *ScheduleTask) bool { if table, ok := r.spans.Get(span); ok { // If table is back to Replicating or Removed, diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index ae05321c823..47c48cf5cb8 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -394,7 +394,7 @@ func (r *ReplicationSet) poll( var msg *schedulepb.Message switch r.State { case ReplicationSetStateAbsent: - msg, stateChanged, err = r.pollOnAbsent(input, captureID) + stateChanged, err = r.pollOnAbsent(input, captureID) case ReplicationSetStatePrepare: msg, stateChanged, err = r.pollOnPrepare(input, captureID) case ReplicationSetStateCommit: @@ -430,16 +430,16 @@ func (r *ReplicationSet) poll( //nolint:unparam func (r *ReplicationSet) pollOnAbsent( input *tablepb.TableStatus, captureID model.CaptureID, -) (*schedulepb.Message, bool, error) { +) (bool, error) { switch input.State { case tablepb.TableStateAbsent: r.State = ReplicationSetStatePrepare err := r.setCapture(captureID, RoleSecondary) - return nil, true, errors.Trace(err) + return true, errors.Trace(err) case tablepb.TableStateStopped: // Ignore stopped table state as a capture may shutdown unexpectedly. - return nil, false, nil + return false, nil case tablepb.TableStatePreparing, tablepb.TableStatePrepared, tablepb.TableStateReplicating, @@ -449,7 +449,7 @@ func (r *ReplicationSet) pollOnAbsent( zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) - return nil, false, nil + return false, nil } func (r *ReplicationSet) pollOnPrepare( @@ -772,21 +772,26 @@ func (r *ReplicationSet) pollOnRemoving( }, }, false, nil case tablepb.TableStateAbsent, tablepb.TableStateStopped: - errField := zap.Skip() + var err error if r.Primary == captureID { r.clearPrimary() } else if r.isInRole(captureID, RoleSecondary) { - err := r.clearCapture(captureID, RoleSecondary) - errField = zap.Error(err) + err = r.clearCapture(captureID, RoleSecondary) } else { - err := r.clearCapture(captureID, RoleUndetermined) - errField = zap.Error(err) + err = r.clearCapture(captureID, RoleUndetermined) + } + if err != nil { + log.Warn("schedulerv3: replication state remove capture with error", + zap.Any("replicationSet", r), + zap.Stringer("tableState", input), + zap.String("captureID", captureID), + zap.Error(err)) + } else { + log.Info("schedulerv3: replication state remove capture", + zap.Any("replicationSet", r), + zap.Stringer("tableState", input), + zap.String("captureID", captureID)) } - log.Info("schedulerv3: replication state remove capture", - zap.Any("replicationSet", r), - zap.Stringer("tableState", input), - zap.String("captureID", captureID), - errField) return nil, false, nil case tablepb.TableStateStopping: return nil, false, nil @@ -919,7 +924,11 @@ func (r *ReplicationSet) handleCaptureShutdown( Span: r.Span, State: tablepb.TableStateStopped, } + oldState := r.State msgs, err := r.poll(&status, captureID) + log.Info("schedulerv3: replication state transition, capture shutdown", + zap.Any("replicationSet", r), + zap.Stringer("old", oldState), zap.Stringer("new", r.State)) return msgs, true, errors.Trace(err) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go index 35137cff614..09b4219ac04 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go @@ -101,13 +101,8 @@ func (b *basicScheduler) Schedule( zap.Any("allCaptureStatus", captures)) return tasks } - log.Info("schedulerv3: burst add table", - zap.String("namespace", b.changefeedID.Namespace), - zap.String("changefeed", b.changefeedID.ID), - zap.Strings("captureIDs", captureIDs), - zap.Int("tableCount", len(newSpans))) tasks = append( - tasks, newBurstAddTables(checkpointTs, newSpans, captureIDs)) + tasks, newBurstAddTables(b.changefeedID, checkpointTs, newSpans, captureIDs)) } // Build remove table tasks. @@ -121,10 +116,9 @@ func (b *basicScheduler) Schedule( intersectionTable := spanz.NewBtreeMap[struct{}]() for _, span := range currentSpans { _, ok := replications.Get(span) - if !ok { - continue + if ok { + intersectionTable.ReplaceOrInsert(span, struct{}{}) } - intersectionTable.ReplaceOrInsert(span, struct{}{}) } rmSpans := make([]tablepb.Span, 0) replications.Ascend(func(span tablepb.Span, value *replication.ReplicationSet) bool { @@ -135,10 +129,6 @@ func (b *basicScheduler) Schedule( return true }) if len(rmSpans) > 0 { - log.Info("schedulerv3: burst remove table", - zap.String("namespace", b.changefeedID.Namespace), - zap.String("changefeed", b.changefeedID.ID), - zap.Int("tableCount", len(rmSpans))) tasks = append(tasks, newBurstRemoveTables(rmSpans, replications, b.changefeedID)) } @@ -148,24 +138,34 @@ func (b *basicScheduler) Schedule( // newBurstAddTables add each new table to captures in a round-robin way. func newBurstAddTables( + changefeedID model.ChangeFeedID, checkpointTs model.Ts, newSpans []tablepb.Span, captureIDs []model.CaptureID, ) *replication.ScheduleTask { idx := 0 tables := make([]replication.AddTable, 0, len(newSpans)) for _, span := range newSpans { + targetCapture := captureIDs[idx] tables = append(tables, replication.AddTable{ Span: span, - CaptureID: captureIDs[idx], + CaptureID: targetCapture, CheckpointTs: checkpointTs, }) + log.Info("schedulerv3: burst add table", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("captureID", targetCapture), + zap.Any("tableID", span.TableID)) + idx++ if idx >= len(captureIDs) { idx = 0 } } - return &replication.ScheduleTask{BurstBalance: &replication.BurstBalance{ - AddTables: tables, - }} + return &replication.ScheduleTask{ + BurstBalance: &replication.BurstBalance{ + AddTables: tables, + }, + } } func newBurstRemoveTables( @@ -191,6 +191,11 @@ func newBurstRemoveTables( Span: span, CaptureID: captureID, }) + log.Info("schedulerv3: burst remove table", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("captureID", captureID), + zap.Any("tableID", span.TableID)) } return &replication.ScheduleTask{BurstBalance: &replication.BurstBalance{ RemoveTables: tables, From bc7254b88ecca6a9c5b574899c06a966fb030a9c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 9 Oct 2023 11:27:35 +0800 Subject: [PATCH 4/6] add more logs to debug the scheduler. --- .../v3/replication/replication_set.go | 106 +++++++++++++++--- .../internal/v3/scheduler/scheduler_basic.go | 23 ++-- 2 files changed, 109 insertions(+), 20 deletions(-) diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index 47c48cf5cb8..e23e04abf91 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -202,8 +202,10 @@ func NewReplicationSet( // We need to wait its state becomes Stopped or Absent before // proceeding further scheduling. log.Warn("schedulerv3: found a stopping capture during initializing", - zap.Any("replicationSet", r), + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Int64("tableID", table.Span.TableID), + zap.Any("replicationSet", r), zap.Any("status", tableStatus)) err := r.setCapture(captureID, RoleUndetermined) if err != nil { @@ -215,8 +217,10 @@ func NewReplicationSet( // Ignore stop state. default: log.Warn("schedulerv3: unknown table state", - zap.Any("replicationSet", r), + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Int64("tableID", table.Span.TableID), + zap.Any("replicationSet", r), zap.Any("status", tableStatus)) } } @@ -240,6 +244,8 @@ func NewReplicationSet( r.State = ReplicationSetStateRemoving } log.Info("schedulerv3: initialize replication set", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r)) return r, nil @@ -291,7 +297,10 @@ func (r *ReplicationSet) clearCapture(captureID model.CaptureID, role Role) erro func (r *ReplicationSet) promoteSecondary(captureID model.CaptureID) error { if r.Primary == captureID { - log.Warn("schedulerv3: capture is already promoted", + log.Warn("schedulerv3: capture is already promoted as the primary", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.String("captureID", captureID), zap.Any("replicationSet", r)) return nil } @@ -319,6 +328,8 @@ func (r *ReplicationSet) inconsistentError( input *tablepb.TableStatus, captureID model.CaptureID, msg string, fields ...zap.Field, ) error { fields = append(fields, []zap.Field{ + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.String("captureID", captureID), zap.Stringer("tableState", input), zap.Any("replicationSet", r), @@ -332,6 +343,8 @@ func (r *ReplicationSet) multiplePrimaryError( input *tablepb.TableStatus, captureID model.CaptureID, msg string, fields ...zap.Field, ) error { fields = append(fields, []zap.Field{ + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.String("captureID", captureID), zap.Stringer("tableState", input), zap.Any("replicationSet", r), @@ -415,12 +428,12 @@ func (r *ReplicationSet) poll( } if stateChanged { log.Info("schedulerv3: replication state transition, poll", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Stringer("old", oldState), - zap.Stringer("new", r.State), - zap.String("namespace", r.Changefeed.Namespace), - zap.String("changefeed", r.Changefeed.ID)) + zap.Stringer("new", r.State)) } } @@ -446,6 +459,8 @@ func (r *ReplicationSet) pollOnAbsent( tablepb.TableStateStopping: } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -493,6 +508,8 @@ func (r *ReplicationSet) pollOnPrepare( // Primary is stopped, but we may still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("schedulerv3: primary is stopped during Prepare", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -501,6 +518,8 @@ func (r *ReplicationSet) pollOnPrepare( } if r.isInRole(captureID, RoleSecondary) { log.Info("schedulerv3: capture is stopped during Prepare", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -521,6 +540,8 @@ func (r *ReplicationSet) pollOnPrepare( } } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -554,6 +575,8 @@ func (r *ReplicationSet) pollOnCommit( // before promoting the secondary, otherwise there may be two // primary that write data and lead to data inconsistency. log.Info("schedulerv3: there are unknown captures during commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("captureID", captureID)) @@ -566,6 +589,8 @@ func (r *ReplicationSet) pollOnCommit( } log.Info("schedulerv3: promote secondary, no primary", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("captureID", captureID)) @@ -597,6 +622,8 @@ func (r *ReplicationSet) pollOnCommit( if !r.hasRole(RoleSecondary) { // If there is no secondary, transit to Absent. log.Info("schedulerv3: primary is stopped during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -610,6 +637,8 @@ func (r *ReplicationSet) pollOnCommit( return nil, false, errors.Trace(err) } log.Info("schedulerv3: replication state promote secondary", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("original", original), @@ -632,6 +661,8 @@ func (r *ReplicationSet) pollOnCommit( // upon entering Commit state. Do not change state and wait // the original primary reports its table. log.Info("schedulerv3: secondary is stopped during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -646,6 +677,8 @@ func (r *ReplicationSet) pollOnCommit( return nil, true, nil } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopped during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -696,6 +729,8 @@ func (r *ReplicationSet) pollOnCommit( return nil, false, err } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopping during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -705,6 +740,8 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStatePreparing: } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -737,6 +774,8 @@ func (r *ReplicationSet) pollOnReplicating( // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("schedulerv3: primary is stopped during Replicating", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -746,6 +785,8 @@ func (r *ReplicationSet) pollOnReplicating( } } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -782,12 +823,16 @@ func (r *ReplicationSet) pollOnRemoving( } if err != nil { log.Warn("schedulerv3: replication state remove capture with error", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Error(err)) } else { log.Info("schedulerv3: replication state remove capture", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("captureID", captureID)) @@ -797,6 +842,8 @@ func (r *ReplicationSet) pollOnRemoving( return nil, false, nil } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -815,7 +862,10 @@ func (r *ReplicationSet) handleAddTable( // Ignore add table if it's not in Absent state. if r.State != ReplicationSetStateAbsent { log.Warn("schedulerv3: add table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } err := r.setCapture(captureID, RoleSecondary) @@ -834,6 +884,8 @@ func (r *ReplicationSet) handleAddTable( } log.Info("schedulerv3: replication state transition, add table", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("old", oldState), zap.Stringer("new", r.State)) return msgs, nil @@ -845,7 +897,10 @@ func (r *ReplicationSet) handleMoveTable( // Ignore move table if it has been removed already. if r.hasRemoved() { log.Warn("schedulerv3: move table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } // Ignore move table if @@ -853,7 +908,10 @@ func (r *ReplicationSet) handleMoveTable( // 2) the dest capture is the primary. if r.State != ReplicationSetStateReplicating || r.Primary == dest { log.Warn("schedulerv3: move table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } oldState := r.State @@ -863,8 +921,11 @@ func (r *ReplicationSet) handleMoveTable( return nil, errors.Trace(err) } log.Info("schedulerv3: replication state transition, move table", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Stringer("new", r.State), zap.Any("replicationSet", r), - zap.Stringer("old", oldState), zap.Stringer("new", r.State)) + zap.Stringer("old", oldState)) status := tablepb.TableStatus{ Span: r.Span, State: tablepb.TableStateAbsent, @@ -877,20 +938,29 @@ func (r *ReplicationSet) handleRemoveTable() ([]*schedulepb.Message, error) { // Ignore remove table if it has been removed already. if r.hasRemoved() { log.Warn("schedulerv3: remove table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } // Ignore remove table if it's not in Replicating state. if r.State != ReplicationSetStateReplicating { log.Warn("schedulerv3: remove table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } oldState := r.State r.State = ReplicationSetStateRemoving log.Info("schedulerv3: replication state transition, remove table", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), zap.Any("replicationSet", r), - zap.Stringer("old", oldState), zap.Stringer("new", r.State)) + zap.Stringer("old", oldState)) status := tablepb.TableStatus{ Span: r.Span, State: tablepb.TableStateReplicating, @@ -927,6 +997,9 @@ func (r *ReplicationSet) handleCaptureShutdown( oldState := r.State msgs, err := r.poll(&status, captureID) log.Info("schedulerv3: replication state transition, capture shutdown", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), zap.Any("replicationSet", r), zap.Stringer("old", oldState), zap.Stringer("new", r.State)) return msgs, true, errors.Trace(err) @@ -937,6 +1010,9 @@ func (r *ReplicationSet) updateCheckpointAndStats( ) error { if checkpoint.ResolvedTs < checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), zap.Any("replicationSet", r), zap.Any("checkpoint", checkpoint)) @@ -953,6 +1029,10 @@ func (r *ReplicationSet) updateCheckpointAndStats( } if r.Checkpoint.ResolvedTs < r.Checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r), zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go index 09b4219ac04..2efffdbda7a 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go @@ -128,9 +128,10 @@ func (b *basicScheduler) Schedule( } return true }) - if len(rmSpans) > 0 { - tasks = append(tasks, - newBurstRemoveTables(rmSpans, replications, b.changefeedID)) + + removeTableTasks := newBurstRemoveTables(rmSpans, replications, b.changefeedID) + if removeTableTasks != nil { + tasks = append(tasks, removeTableTasks) } } return tasks @@ -181,7 +182,8 @@ func newBurstRemoveTables( break } if captureID == "" { - log.Warn("schedulerv3: primary or secondary not found for removed table", + log.Warn("schedulerv3: primary or secondary not found for removed table,"+ + "this may happen if the capture shutdown", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID), zap.Any("table", rep)) @@ -197,7 +199,14 @@ func newBurstRemoveTables( zap.String("captureID", captureID), zap.Any("tableID", span.TableID)) } - return &replication.ScheduleTask{BurstBalance: &replication.BurstBalance{ - RemoveTables: tables, - }} + + if len(tables) == 0 { + return nil + } + + return &replication.ScheduleTask{ + BurstBalance: &replication.BurstBalance{ + RemoveTables: tables, + }, + } } From 9213c2ee1bd967345f6aad860b951908f634a39a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 9 Oct 2023 15:07:51 +0800 Subject: [PATCH 5/6] fix logs. --- .../internal/v3/replication/replication_manager.go | 11 ----------- .../internal/v3/replication/replication_set.go | 11 +++-------- .../internal/v3/scheduler/scheduler_manager.go | 6 ------ 3 files changed, 3 insertions(+), 25 deletions(-) diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 01dcfc77f5a..0ec7c7a671b 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -139,17 +139,6 @@ func (s *ScheduleTask) String() string { return "" } -type ScheduleTasks []*ScheduleTask - -func (s ScheduleTasks) String() string { - var buf bytes.Buffer - for _, task := range s { - buf.WriteString(task.String()) - buf.WriteString(";") - } - return buf.String() -} - // Manager manages replications and running scheduling tasks. type Manager struct { //nolint:revive spans *spanz.BtreeMap[*ReplicationSet] diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index e23e04abf91..d0c155f7b35 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -829,13 +829,9 @@ func (r *ReplicationSet) pollOnRemoving( zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Error(err)) - } else { - log.Info("schedulerv3: replication state remove capture", - zap.String("namespace", r.Changefeed.Namespace), - zap.String("changefeed", r.Changefeed.ID), - zap.Any("replicationSet", r), - zap.Stringer("tableState", input), - zap.String("captureID", captureID)) + } + if len(r.Captures) == 0 { + r.State = ReplicationSetStateAbsent } return nil, false, nil case tablepb.TableStateStopping: @@ -1032,7 +1028,6 @@ func (r *ReplicationSet) updateCheckpointAndStats( zap.String("namespace", r.Changefeed.Namespace), zap.String("changefeed", r.Changefeed.ID), zap.Int64("tableID", r.Span.TableID), - zap.Any("replicationSet", r), zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go index db1554d58b4..76817b344f6 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go @@ -88,12 +88,6 @@ func (sm *Manager) Schedule( sm.tasksCounter[name]++ } if len(tasks) != 0 { - log.Info("schedulerv3: new schedule task", - zap.String("namespace", sm.changefeedID.Namespace), - zap.String("changefeed", sm.changefeedID.ID), - zap.Int("taskNumber", len(tasks)), - zap.Stringer("task", replication.ScheduleTasks(tasks)), - zap.String("scheduler", scheduler.Name())) return tasks } } From 7354390e96dd58041dc24b3760fddb4e35083aa3 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 9 Oct 2023 16:45:16 +0800 Subject: [PATCH 6/6] tiny adjust. --- cdc/scheduler/internal/v3/replication/replication_set.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index d0c155f7b35..c88ee162e2e 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -830,9 +830,6 @@ func (r *ReplicationSet) pollOnRemoving( zap.String("captureID", captureID), zap.Error(err)) } - if len(r.Captures) == 0 { - r.State = ReplicationSetStateAbsent - } return nil, false, nil case tablepb.TableStateStopping: return nil, false, nil