Skip to content

Commit

Permalink
Merge branch 'release-5.2' into batch-cherrypick-defaultvalue
Browse files Browse the repository at this point in the history
  • Loading branch information
maxshuang authored Apr 14, 2022
2 parents 418daf4 + d92f016 commit 51ae407
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 78 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,11 @@ error = '''
waiting processor to handle the operation finished timeout
'''

["CDC:ErrWorkerPoolGracefulUnregisterTimedOut"]
error = '''
workerpool handle graceful unregister timed out
'''

["CDC:ErrWorkerPoolHandleCancelled"]
error = '''
workerpool handle is cancelled
Expand Down
5 changes: 3 additions & 2 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ var (
ErrPipelineTryAgain = errors.Normalize("pipeline is full, please try again. Internal use only, report a bug if seen externally", errors.RFCCodeText("CDC:ErrPipelineTryAgain"))

// workerpool errors
ErrWorkerPoolHandleCancelled = errors.Normalize("workerpool handle is cancelled", errors.RFCCodeText("CDC:ErrWorkerPoolHandleCancelled"))
ErrAsyncPoolExited = errors.Normalize("asyncPool has exited. Report a bug if seen externally.", errors.RFCCodeText("CDC:ErrAsyncPoolExited"))
ErrWorkerPoolHandleCancelled = errors.Normalize("workerpool handle is cancelled", errors.RFCCodeText("CDC:ErrWorkerPoolHandleCancelled"))
ErrAsyncPoolExited = errors.Normalize("asyncPool has exited. Report a bug if seen externally.", errors.RFCCodeText("CDC:ErrAsyncPoolExited"))
ErrWorkerPoolGracefulUnregisterTimedOut = errors.Normalize("workerpool handle graceful unregister timed out", errors.RFCCodeText("CDC:ErrWorkerPoolGracefulUnregisterTimedOut"))

// unified sorter errors
ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating"))
Expand Down
30 changes: 14 additions & 16 deletions pkg/workerpool/async_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@ import (
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/log"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

type asyncPoolSuite struct{}
func TestBasic(t *testing.T) {
defer testleak.AfterTestT(t)()

var _ = check.Suite(&asyncPoolSuite{})

func (s *asyncPoolSuite) TestBasic(c *check.C) {
defer testleak.AfterTest(c)()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Expand All @@ -54,19 +51,20 @@ func (s *asyncPoolSuite) TestBasic(c *check.C) {
atomic.AddInt32(&sum, int32(finalI+1))
wg.Done()
})
c.Assert(err, check.IsNil)
require.Nil(t, err)
}

wg.Wait()
c.Assert(sum, check.Equals, int32(5050))
require.Equal(t, sum, int32(5050))

cancel()
err := errg.Wait()
c.Assert(err, check.ErrorMatches, "context canceled")
require.Regexp(t, "context canceled", err)
}

func (s *asyncPoolSuite) TestEventuallyRun(c *check.C) {
defer testleak.AfterTest(c)()
func TestEventuallyRun(t *testing.T) {
defer testleak.AfterTestT(t)()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Expand Down Expand Up @@ -106,16 +104,16 @@ loop:
atomic.AddInt32(&sum, int32(finalI+1))
})
if err != nil {
c.Assert(err, check.ErrorMatches, "context canceled")
require.Regexp(t, "context canceled", err.Error())
} else {
sumExpected += int32(i + 1)
}
}

cancel()
err := errg.Wait()
c.Assert(err, check.IsNil)
c.Assert(sum, check.Equals, sumExpected)
require.Nil(t, err)
require.Equal(t, sum, sumExpected)
}

func runForDuration(ctx context.Context, duration time.Duration, f func(ctx context.Context) error) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/workerpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type EventHandle interface {
// Unregister returns. Unregister WILL NOT attempt to wait for pending events to complete, which means the last few events can be lost.
Unregister()

// GracefulUnregister removes the EventHandle after
// all pending events have been processed.
GracefulUnregister(ctx context.Context, timeout time.Duration) error

// ErrCh returns a channel that outputs the first non-nil result of events submitted to this EventHandle.
// Note that a non-nil result of an event cancels the EventHandle, so there is at most one error.
ErrCh() <-chan error
Expand Down
81 changes: 71 additions & 10 deletions pkg/workerpool/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/log"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/notify"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,11 +89,19 @@ func (p *defaultPoolImpl) RegisterEvent(f func(ctx context.Context, event interf
return handler
}

type handleStatus = int32

const (
handleRunning = handleStatus(iota)
handleCancelling
handleCancelled
)

type defaultEventHandle struct {
// the function to be run each time the event is triggered
f func(ctx context.Context, event interface{}) error
// whether this handle has been cancelled, must be accessed atomically
isCancelled int32
// must be accessed atomically
status handleStatus
// channel for the error returned by f
errCh chan error
// the worker that the handle is associated with
Expand All @@ -120,7 +127,8 @@ type defaultEventHandle struct {
}

func (h *defaultEventHandle) AddEvent(ctx context.Context, event interface{}) error {
if atomic.LoadInt32(&h.isCancelled) == 1 {
status := atomic.LoadInt32(&h.status)
if status != handleRunning {
return cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs()
}

Expand Down Expand Up @@ -158,7 +166,10 @@ func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duratio
}

func (h *defaultEventHandle) Unregister() {
if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) {
if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelled) {
// call synchronize so that the returning of Unregister cannot race
// with the calling of the errorHandler, if an error is already being processed.
h.worker.synchronize()
// already cancelled
return
}
Expand All @@ -172,6 +183,46 @@ func (h *defaultEventHandle) Unregister() {
h.doCancel(cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs())
}

func (h *defaultEventHandle) GracefulUnregister(ctx context.Context, timeout time.Duration) error {
if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelling) {
// already cancelling or cancelled
return nil
}

defer func() {
if !atomic.CompareAndSwapInt32(&h.status, handleCancelling, handleCancelled) {
// already cancelled
return
}

// call synchronize so that all function executions related to this handle will be
// linearized BEFORE Unregister.
h.worker.synchronize()
h.doCancel(cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs())
}()

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

doneCh := make(chan struct{})
select {
case <-ctx.Done():
return cerrors.ErrWorkerPoolGracefulUnregisterTimedOut.GenWithStackByArgs()
case h.worker.taskCh <- task{
handle: h,
doneCh: doneCh,
}:
}

select {
case <-ctx.Done():
return cerrors.ErrWorkerPoolGracefulUnregisterTimedOut.GenWithStackByArgs()
case <-doneCh:
}

return nil
}

// callers of doCancel need to check h.isCancelled first.
// DO NOT call doCancel multiple times on the same handle.
func (h *defaultEventHandle) doCancel(err error) {
Expand Down Expand Up @@ -202,7 +253,7 @@ func (h *defaultEventHandle) HashCode() int64 {
}

func (h *defaultEventHandle) cancelWithErr(err error) {
if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) {
if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelled) {
// already cancelled
return
}
Expand Down Expand Up @@ -236,6 +287,8 @@ func (h *defaultEventHandle) doTimer(ctx context.Context) error {
type task struct {
handle *defaultEventHandle
f func(ctx context.Context) error

doneCh chan struct{} // only used in implementing GracefulUnregister
}

type worker struct {
Expand Down Expand Up @@ -279,11 +332,19 @@ func (w *worker) run(ctx context.Context) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case task := <-w.taskCh:
if atomic.LoadInt32(&task.handle.isCancelled) == 1 {
if atomic.LoadInt32(&task.handle.status) == handleCancelled {
// ignored cancelled handle
continue
}

if task.doneCh != nil {
close(task.doneCh)
if task.f != nil {
log.L().DPanic("unexpected message handler func in cancellation task", zap.Stack("stack"))
}
continue
}

err := task.f(ctx)
if err != nil {
task.handle.cancelWithErr(err)
Expand All @@ -296,7 +357,7 @@ func (w *worker) run(ctx context.Context) error {

w.handleRWLock.RLock()
for handle := range w.handles {
if atomic.LoadInt32(&handle.isCancelled) == 1 {
if atomic.LoadInt32(&handle.status) == handleCancelled {
// ignored cancelled handle
continue
}
Expand Down Expand Up @@ -352,7 +413,7 @@ func (w *worker) synchronize() {
// likely the workerpool has deadlocked, or there is a bug
// in the event handlers.
logWarn("synchronize is taking too long, report a bug",
zap.Duration("elapsed", time.Since(startTime)),
zap.Duration("duration", time.Since(startTime)),
zap.Stack("stacktrace"))
}
}
Expand Down
Loading

0 comments on commit 51ae407

Please sign in to comment.