Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker/: fix worker stuck in unitTransWaitCondition #589

Merged
merged 10 commits into from
Apr 20, 2020
18 changes: 15 additions & 3 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ func (h *realRelayHolder) Migrate(ctx context.Context, binlogName string, binlog

type dummyRelayHolder struct {
sync.RWMutex
initError error
stage pb.Stage
initError error
stage pb.Stage
relayBinlog string

cfg *config.SourceConfig
}
Expand All @@ -397,6 +398,14 @@ func NewDummyRelayHolder(cfg *config.SourceConfig) RelayHolder {
}
}

// NewDummyRelayHolderWithRelayBinlog creates a new RelayHolder with relayBinlog in relayStatus
func NewDummyRelayHolderWithRelayBinlog(cfg *config.SourceConfig, relayBinlog string) RelayHolder {
return &dummyRelayHolder{
cfg: cfg,
relayBinlog: relayBinlog,
}
}

// NewDummyRelayHolderWithInitError creates a new RelayHolder with init error
func NewDummyRelayHolderWithInitError(cfg *config.SourceConfig) RelayHolder {
return &dummyRelayHolder{
Expand Down Expand Up @@ -433,7 +442,10 @@ func (d *dummyRelayHolder) Close() {
func (d *dummyRelayHolder) Status() *pb.RelayStatus {
d.Lock()
defer d.Unlock()
return &pb.RelayStatus{Stage: d.stage}
return &pb.RelayStatus{
Stage: d.stage,
RelayBinlog: d.relayBinlog,
}
}

// Error implements interface of RelayHolder
Expand Down
74 changes: 54 additions & 20 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ type SubTask struct {
l log.Logger

sync.RWMutex
wg sync.WaitGroup
wg sync.WaitGroup
// ctx is used for the whole subtask. It will be created only when we new a subtask.
ctx context.Context
cancel context.CancelFunc
// currCtx is used for one loop. It will be created each time we use st.run/st.Resume
currCtx context.Context
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
currCancel context.CancelFunc
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

units []unit.Unit // units do job one by one
currUnit unit.Unit
Expand All @@ -98,10 +102,13 @@ func NewRealSubTask(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Sub

// NewSubTaskWithStage creates a new SubTask with stage
func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *clientv3.Client) *SubTask {
ctx, cancel := context.WithCancel(context.Background())
st := SubTask{
cfg: cfg,
stage: stage,
l: log.With(zap.String("subtask", cfg.Name)),
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
}
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
Expand Down Expand Up @@ -186,36 +193,59 @@ func (st *SubTask) Run() {
}

func (st *SubTask) run() {
st.setStage(pb.Stage_Paused)
err := st.unitTransWaitCondition()
st.setStage(pb.Stage_Running)
ctx, cancel := context.WithCancel(st.ctx)
st.setCurrCtx(ctx, cancel)
err := st.unitTransWaitCondition(ctx)
if err != nil {
st.l.Error("wait condition", log.ShortError(err))
st.fail(err)
return
} else if ctx.Err() != nil {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the stage keep as Stage_Running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.Err() != nil means this context is canceled in other go routine, that go routine will change the stage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, how about checking the stage in the unit test cases?

}

st.setStage(pb.Stage_Running)
st.setResult(nil) // clear previous result
cu := st.CurrUnit()
st.l.Info("start to run", zap.Stringer("unit", cu.Type()))
st.ctx, st.cancel = context.WithCancel(context.Background())
pr := make(chan pb.ProcessResult, 1)
st.wg.Add(1)
go st.fetchResult(pr)
go cu.Process(st.ctx, pr)
go cu.Process(ctx, pr)
}

func (st *SubTask) setCurrCtx(ctx context.Context, cancel context.CancelFunc) {
st.Lock()
// call previous cancel func for safety
if st.currCancel != nil {
st.currCancel()
}
st.currCtx = ctx
st.currCancel = cancel
st.Unlock()
}

func (st *SubTask) callCurrCancel() {
st.RLock()
st.currCancel()
st.RUnlock()
}

// fetchResult fetches units process result
// when dm-unit report an error, we need to re-Process the sub task
func (st *SubTask) fetchResult(pr chan pb.ProcessResult) {
defer st.wg.Done()

st.RLock()
ctx := st.currCtx
st.RUnlock()

select {
case <-st.ctx.Done():
case <-ctx.Done():
return
case result := <-pr:
st.setResult(&result) // save result
st.cancel() // dm-unit finished, canceled or error occurred, always cancel processing
st.callCurrCancel() // dm-unit finished, canceled or error occurred, always cancel processing

if len(result.Errors) == 0 && st.Stage() == pb.Stage_Paused {
return // paused by external request
Expand Down Expand Up @@ -384,16 +414,16 @@ func (st *SubTask) Result() *pb.ProcessResult {
// Close stops the sub task
func (st *SubTask) Close() {
st.l.Info("closing")
if st.cancel == nil {
st.l.Info("not run yet, no need to close")
if st.Stage() == pb.Stage_Stopped {
st.l.Info("subTask is already closed, no need to close")
return
}

st.cancel()
st.closeUnits() // close all un-closed units
st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped)
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name)
st.wg.Wait()
st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped)
}

// Pause pauses the running sub task
Expand All @@ -402,7 +432,7 @@ func (st *SubTask) Pause() error {
return terror.ErrWorkerNotRunningStage.Generate()
}

st.cancel()
st.callCurrCancel()
st.wg.Wait() // wait fetchResult return

cu := st.CurrUnit()
Expand All @@ -420,27 +450,29 @@ func (st *SubTask) Resume() error {
return nil
}

if !st.stageCAS(pb.Stage_Paused, pb.Stage_Running) {
return terror.ErrWorkerNotPausedStage.Generate()
}
ctx, cancel := context.WithCancel(st.ctx)
st.setCurrCtx(ctx, cancel)
// NOTE: this may block if user resume a task
err := st.unitTransWaitCondition()
err := st.unitTransWaitCondition(ctx)
if err != nil {
st.l.Error("wait condition", log.ShortError(err))
st.setStage(pb.Stage_Paused)
return err
}

if !st.stageCAS(pb.Stage_Paused, pb.Stage_Running) {
return terror.ErrWorkerNotPausedStage.Generate()
} else if ctx.Err() != nil {
return nil
}

st.setResult(nil) // clear previous result
cu := st.CurrUnit()
st.l.Info("resume with unit", zap.Stringer("unit", cu.Type()))

st.ctx, st.cancel = context.WithCancel(context.Background())
pr := make(chan pb.ProcessResult, 1)
st.wg.Add(1)
go st.fetchResult(pr)
go cu.Resume(st.ctx, pr)
go cu.Resume(ctx, pr)
return nil
}

Expand Down Expand Up @@ -538,7 +570,7 @@ func (st *SubTask) ShardDDLOperation() *pessimism.Operation {
// unitTransWaitCondition waits when transferring from current unit to next unit.
// Currently there is only one wait condition
// from Load unit to Sync unit, wait for relay-log catched up with mydumper binlog position.
func (st *SubTask) unitTransWaitCondition() error {
func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error {
pu := st.PrevUnit()
cu := st.CurrUnit()
if pu != nil && pu.Type() == pb.UnitType_Load && cu.Type() == pb.UnitType_Sync {
Expand Down Expand Up @@ -572,6 +604,8 @@ func (st *SubTask) unitTransWaitCondition() error {
select {
case <-ctx.Done():
return terror.ErrWorkerWaitRelayCatchupTimeout.Generate(waitRelayCatchupTimeout, pos1, pos2)
case <-subTaskCtx.Done():
return nil
case <-time.After(time.Millisecond * 50):
}
}
Expand Down
71 changes: 70 additions & 1 deletion dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"go.etcd.io/etcd/clientv3"
)

const (
// mocked loadMetaBinlog must be greater than relayHolderBinlog
loadMetaBinlog = "(mysql-bin.00001,154)"
relayHolderBinlog = "(mysql-bin.00001,150)"
)

type testSubTask struct{}

var _ = Suite(&testSubTask{})
Expand Down Expand Up @@ -123,7 +129,7 @@ func (m *MockUnit) Status() interface{} {
case pb.UnitType_Dump:
return &pb.DumpStatus{}
case pb.UnitType_Load:
return &pb.LoadStatus{}
return &pb.LoadStatus{MetaBinlog: loadMetaBinlog}
case pb.UnitType_Sync:
return &pb.SyncStatus{}
default:
Expand Down Expand Up @@ -479,3 +485,66 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {
c.Assert(st.CurrUnit(), Equals, nil)
c.Assert(st.Result(), IsNil)
}

func (t *testSubTask) TestSubtaskFastQuit(c *C) {
// case: test subtask stuck into unitTransWaitCondition
cfg := &config.SubTaskConfig{
Name: "testSubtaskFastQuit",
Mode: config.ModeAll,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w := &Worker{
ctx: ctx,
// loadStatus relay MetaBinlog must be greater
relayHolder: NewDummyRelayHolderWithRelayBinlog(config.NewSourceConfig(), relayHolderBinlog),
}
InitConditionHub(w)

mockLoader := NewMockUnit(pb.UnitType_Load)
mockSyncer := NewMockUnit(pb.UnitType_Sync)

st := NewSubTaskWithStage(cfg, pb.Stage_Paused, nil)
st.prevUnit = mockLoader
st.currUnit = mockSyncer

finished := make(chan struct{})
go func() {
st.run()
close(finished)
}()

// test Pause
time.Sleep(time.Second) // wait for task to run for some time
c.Assert(st.Stage(), Equals, pb.Stage_Running)
c.Assert(st.Pause(), IsNil)
select {
case <-time.After(500 * time.Millisecond):
c.Fatal("fail to pause subtask in 0.5s when stuck into unitTransWaitCondition")
case <-finished:
}
c.Assert(st.Stage(), Equals, pb.Stage_Paused)

st = NewSubTaskWithStage(cfg, pb.Stage_Paused, nil)
st.prevUnit = mockLoader
st.currUnit = mockSyncer

finished = make(chan struct{})
go func() {
st.run()
close(finished)
}()

time.Sleep(time.Second)
c.Assert(st.Stage(), Equals, pb.Stage_Running)
// test Close
st.Close()
select {
case <-time.After(500 * time.Millisecond):
c.Fatal("fail to stop subtask in 0.5s when stuck into unitTransWaitCondition")
case <-finished:
}
c.Assert(st.Stage(), Equals, pb.Stage_Stopped)
}