Skip to content

Commit

Permalink
processor: prevents individual table pipelines to block the entire pr…
Browse files Browse the repository at this point in the history
…ocessor (#2059)
  • Loading branch information
liuzix authored Jun 16, 2021
1 parent 633f935 commit 67c8cd3
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 9 deletions.
15 changes: 11 additions & 4 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type TablePipeline interface {
// UpdateBarrierTs updates the barrier ts in this table pipeline
UpdateBarrierTs(ts model.Ts)
// AsyncStop tells the pipeline to stop, and returns true is the pipeline is already stopped.
AsyncStop(targetTs model.Ts)
AsyncStop(targetTs model.Ts) bool
// Workload returns the workload of this table
Workload() model.WorkloadInfo
// Status returns the status of this table pipeline
Expand Down Expand Up @@ -93,21 +93,28 @@ func (t *tablePipelineImpl) CheckpointTs() model.Ts {
// UpdateBarrierTs updates the barrier ts in this table pipeline
func (t *tablePipelineImpl) UpdateBarrierTs(ts model.Ts) {
err := t.p.SendToFirstNode(pipeline.BarrierMessage(ts))
if err != nil && !cerror.ErrSendToClosedPipeline.Equal(err) {
if err != nil && !cerror.ErrSendToClosedPipeline.Equal(err) && !cerror.ErrPipelineTryAgain.Equal(err) {
log.Panic("unexpect error from send to first node", zap.Error(err))
}
}

// AsyncStop tells the pipeline to stop, and returns true is the pipeline is already stopped.
func (t *tablePipelineImpl) AsyncStop(targetTs model.Ts) {
func (t *tablePipelineImpl) AsyncStop(targetTs model.Ts) bool {
err := t.p.SendToFirstNode(pipeline.CommandMessage(&pipeline.Command{
Tp: pipeline.CommandTypeStopAtTs,
StoppedTs: targetTs,
}))
log.Info("send async stop signal to table", zap.Int64("tableID", t.tableID), zap.Uint64("targetTs", targetTs))
if err != nil && !cerror.ErrSendToClosedPipeline.Equal(err) {
if err != nil {
if cerror.ErrPipelineTryAgain.Equal(err) {
return false
}
if cerror.ErrSendToClosedPipeline.Equal(err) {
return true
}
log.Panic("unexpect error from send to first node", zap.Error(err))
}
return true
}

var workload = model.WorkloadInfo{Workload: 1}
Expand Down
8 changes: 7 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,13 @@ func (p *processor) handleTableOperation(ctx cdcContext.Context) error {
if opt.BoundaryTs < globalCheckpointTs {
log.Warn("the BoundaryTs of remove table operation is smaller than global checkpoint ts", zap.Uint64("globalCheckpointTs", globalCheckpointTs), zap.Any("operation", opt))
}
table.AsyncStop(opt.BoundaryTs)
if !table.AsyncStop(opt.BoundaryTs) {
// We use a Debug log because it is conceivable for the pipeline to block for a legitimate reason,
// and we do not want to alarm the user.
log.Debug("AsyncStop has failed, possible due to a full pipeline",
zap.Uint64("checkpointTs", table.CheckpointTs()), zap.Int64("tableID", tableID))
continue
}
patchOperation(tableID, func(operation *model.TableOperation) error {
operation.Status = model.OperProcessed
return nil
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ func (m *mockTablePipeline) UpdateBarrierTs(ts model.Ts) {
m.barrierTs = ts
}

func (m *mockTablePipeline) AsyncStop(targetTs model.Ts) {
func (m *mockTablePipeline) AsyncStop(targetTs model.Ts) bool {
m.stopTs = targetTs
return true
}

func (m *mockTablePipeline) Workload() model.WorkloadInfo {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ error = '''
pending region cancelled due to stream disconnecting
'''

["CDC:ErrPipelineTryAgain"]
error = '''
pipeline is full, please try again. Internal use only, report a bug if seen externally
'''

["CDC:ErrPrepareAvroFailed"]
error = '''
prepare avro failed
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ var (

// pipeline errors
ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline"))
ErrPipelineTryAgain = errors.Normalize("pipeline is full, please try again. Internal use only, report a bug if seen externally", errors.RFCCodeText("CDC:ErrPipelineTryAgain"))

// workerpool errors
ErrWorkerPoolHandleCancelled = errors.Normalize("workerpool handle is cancelled", errors.RFCCodeText("CDC:ErrWorkerPoolHandleCancelled"))
Expand Down
19 changes: 16 additions & 3 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -59,7 +60,11 @@ func NewPipeline(ctx context.Context, tickDuration time.Duration) *Pipeline {
for {
select {
case <-tickCh:
p.SendToFirstNode(TickMessage()) //nolint:errcheck
err := p.SendToFirstNode(TickMessage()) //nolint:errcheck
if err != nil {
// Errors here are innocent. It's okay for tick messages to get lost.
log.Debug("Error encountered when calling SendToFirstNode", zap.Error(err))
}
case <-ctx.Done():
p.close()
return
Expand Down Expand Up @@ -103,8 +108,16 @@ func (p *Pipeline) SendToFirstNode(msg *Message) error {
if p.isClosed {
return cerror.ErrSendToClosedPipeline.GenWithStackByArgs()
}
// The header channel should never be blocked
p.header <- msg

failpoint.Inject("PipelineSendToFirstNodeTryAgain", func() {
failpoint.Return(cerror.ErrPipelineTryAgain.GenWithStackByArgs())
})

select {
case p.header <- msg:
default:
return cerror.ErrPipelineTryAgain.GenWithStackByArgs()
}
return nil
}

Expand Down

0 comments on commit 67c8cd3

Please sign in to comment.