diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index b0c53180aed..beb8e812e4f 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -49,7 +49,7 @@ type TablePipeline interface { // UpdateBarrierTs updates the barrier ts in this table pipeline UpdateBarrierTs(ts model.Ts) // AsyncStop tells the pipeline to stop, and returns true is the pipeline is already stopped. - AsyncStop(targetTs model.Ts) + AsyncStop(targetTs model.Ts) bool // Workload returns the workload of this table Workload() model.WorkloadInfo // Status returns the status of this table pipeline @@ -93,21 +93,28 @@ func (t *tablePipelineImpl) CheckpointTs() model.Ts { // UpdateBarrierTs updates the barrier ts in this table pipeline func (t *tablePipelineImpl) UpdateBarrierTs(ts model.Ts) { err := t.p.SendToFirstNode(pipeline.BarrierMessage(ts)) - if err != nil && !cerror.ErrSendToClosedPipeline.Equal(err) { + if err != nil && !cerror.ErrSendToClosedPipeline.Equal(err) && !cerror.ErrPipelineTryAgain.Equal(err) { log.Panic("unexpect error from send to first node", zap.Error(err)) } } // AsyncStop tells the pipeline to stop, and returns true is the pipeline is already stopped. -func (t *tablePipelineImpl) AsyncStop(targetTs model.Ts) { +func (t *tablePipelineImpl) AsyncStop(targetTs model.Ts) bool { err := t.p.SendToFirstNode(pipeline.CommandMessage(&pipeline.Command{ Tp: pipeline.CommandTypeStopAtTs, StoppedTs: targetTs, })) log.Info("send async stop signal to table", zap.Int64("tableID", t.tableID), zap.Uint64("targetTs", targetTs)) - if err != nil && !cerror.ErrSendToClosedPipeline.Equal(err) { + if err != nil { + if cerror.ErrPipelineTryAgain.Equal(err) { + return false + } + if cerror.ErrSendToClosedPipeline.Equal(err) { + return true + } log.Panic("unexpect error from send to first node", zap.Error(err)) } + return true } var workload = model.WorkloadInfo{Workload: 1} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 09342e402a2..9342c51ad86 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -365,7 +365,13 @@ func (p *processor) handleTableOperation(ctx cdcContext.Context) error { if opt.BoundaryTs < globalCheckpointTs { log.Warn("the BoundaryTs of remove table operation is smaller than global checkpoint ts", zap.Uint64("globalCheckpointTs", globalCheckpointTs), zap.Any("operation", opt)) } - table.AsyncStop(opt.BoundaryTs) + if !table.AsyncStop(opt.BoundaryTs) { + // We use a Debug log because it is conceivable for the pipeline to block for a legitimate reason, + // and we do not want to alarm the user. + log.Debug("AsyncStop has failed, possible due to a full pipeline", + zap.Uint64("checkpointTs", table.CheckpointTs()), zap.Int64("tableID", tableID)) + continue + } patchOperation(tableID, func(operation *model.TableOperation) error { operation.Status = model.OperProcessed return nil diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 33d9003a598..d1fa9558835 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -85,8 +85,9 @@ func (m *mockTablePipeline) UpdateBarrierTs(ts model.Ts) { m.barrierTs = ts } -func (m *mockTablePipeline) AsyncStop(targetTs model.Ts) { +func (m *mockTablePipeline) AsyncStop(targetTs model.Ts) bool { m.stopTs = targetTs + return true } func (m *mockTablePipeline) Workload() model.WorkloadInfo { diff --git a/errors.toml b/errors.toml index 57afbe46746..32046f1d124 100755 --- a/errors.toml +++ b/errors.toml @@ -571,6 +571,11 @@ error = ''' pending region cancelled due to stream disconnecting ''' +["CDC:ErrPipelineTryAgain"] +error = ''' +pipeline is full, please try again. Internal use only, report a bug if seen externally +''' + ["CDC:ErrPrepareAvroFailed"] error = ''' prepare avro failed diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index b5bbf71115a..803ffce6b5d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -200,6 +200,7 @@ var ( // pipeline errors ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline")) + ErrPipelineTryAgain = errors.Normalize("pipeline is full, please try again. Internal use only, report a bug if seen externally", errors.RFCCodeText("CDC:ErrPipelineTryAgain")) // workerpool errors ErrWorkerPoolHandleCancelled = errors.Normalize("workerpool handle is cancelled", errors.RFCCodeText("CDC:ErrWorkerPoolHandleCancelled")) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 842af22f7b0..9c519cbfe4d 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -17,6 +17,7 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" @@ -59,7 +60,11 @@ func NewPipeline(ctx context.Context, tickDuration time.Duration) *Pipeline { for { select { case <-tickCh: - p.SendToFirstNode(TickMessage()) //nolint:errcheck + err := p.SendToFirstNode(TickMessage()) //nolint:errcheck + if err != nil { + // Errors here are innocent. It's okay for tick messages to get lost. + log.Debug("Error encountered when calling SendToFirstNode", zap.Error(err)) + } case <-ctx.Done(): p.close() return @@ -103,8 +108,16 @@ func (p *Pipeline) SendToFirstNode(msg *Message) error { if p.isClosed { return cerror.ErrSendToClosedPipeline.GenWithStackByArgs() } - // The header channel should never be blocked - p.header <- msg + + failpoint.Inject("PipelineSendToFirstNodeTryAgain", func() { + failpoint.Return(cerror.ErrPipelineTryAgain.GenWithStackByArgs()) + }) + + select { + case p.header <- msg: + default: + return cerror.ErrPipelineTryAgain.GenWithStackByArgs() + } return nil }