diff --git a/errors.toml b/errors.toml index 689c375357a..1f024d331d4 100755 --- a/errors.toml +++ b/errors.toml @@ -861,6 +861,11 @@ error = ''' waiting processor to handle the operation finished timeout ''' +["CDC:ErrWorkerPoolGracefulUnregisterTimedOut"] +error = ''' +workerpool handle graceful unregister timed out +''' + ["CDC:ErrWorkerPoolHandleCancelled"] error = ''' workerpool handle is cancelled diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 46c963ff7ee..bbaf204040f 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -212,8 +212,9 @@ var ( ErrPipelineTryAgain = errors.Normalize("pipeline is full, please try again. Internal use only, report a bug if seen externally", errors.RFCCodeText("CDC:ErrPipelineTryAgain")) // workerpool errors - ErrWorkerPoolHandleCancelled = errors.Normalize("workerpool handle is cancelled", errors.RFCCodeText("CDC:ErrWorkerPoolHandleCancelled")) - ErrAsyncPoolExited = errors.Normalize("asyncPool has exited. Report a bug if seen externally.", errors.RFCCodeText("CDC:ErrAsyncPoolExited")) + ErrWorkerPoolHandleCancelled = errors.Normalize("workerpool handle is cancelled", errors.RFCCodeText("CDC:ErrWorkerPoolHandleCancelled")) + ErrAsyncPoolExited = errors.Normalize("asyncPool has exited. Report a bug if seen externally.", errors.RFCCodeText("CDC:ErrAsyncPoolExited")) + ErrWorkerPoolGracefulUnregisterTimedOut = errors.Normalize("workerpool handle graceful unregister timed out", errors.RFCCodeText("CDC:ErrWorkerPoolGracefulUnregisterTimedOut")) // unified sorter errors ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating")) diff --git a/pkg/workerpool/async_pool_test.go b/pkg/workerpool/async_pool_test.go index 0161b4e2ffc..d4918623700 100644 --- a/pkg/workerpool/async_pool_test.go +++ b/pkg/workerpool/async_pool_test.go @@ -18,22 +18,19 @@ import ( "math/rand" "sync" "sync/atomic" + "testing" "time" - "github.com/pingcap/log" - - "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tiflow/pkg/util/testleak" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) -type asyncPoolSuite struct{} +func TestBasic(t *testing.T) { + defer testleak.AfterTestT(t)() -var _ = check.Suite(&asyncPoolSuite{}) - -func (s *asyncPoolSuite) TestBasic(c *check.C) { - defer testleak.AfterTest(c)() ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -54,19 +51,20 @@ func (s *asyncPoolSuite) TestBasic(c *check.C) { atomic.AddInt32(&sum, int32(finalI+1)) wg.Done() }) - c.Assert(err, check.IsNil) + require.Nil(t, err) } wg.Wait() - c.Assert(sum, check.Equals, int32(5050)) + require.Equal(t, sum, int32(5050)) cancel() err := errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } -func (s *asyncPoolSuite) TestEventuallyRun(c *check.C) { - defer testleak.AfterTest(c)() +func TestEventuallyRun(t *testing.T) { + defer testleak.AfterTestT(t)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -106,7 +104,7 @@ loop: atomic.AddInt32(&sum, int32(finalI+1)) }) if err != nil { - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err.Error()) } else { sumExpected += int32(i + 1) } @@ -114,8 +112,8 @@ loop: cancel() err := errg.Wait() - c.Assert(err, check.IsNil) - c.Assert(sum, check.Equals, sumExpected) + require.Nil(t, err) + require.Equal(t, sum, sumExpected) } func runForDuration(ctx context.Context, duration time.Duration, f func(ctx context.Context) error) error { diff --git a/pkg/workerpool/pool.go b/pkg/workerpool/pool.go index eda6ed03c42..65bb252b73e 100644 --- a/pkg/workerpool/pool.go +++ b/pkg/workerpool/pool.go @@ -52,6 +52,10 @@ type EventHandle interface { // Unregister returns. Unregister WILL NOT attempt to wait for pending events to complete, which means the last few events can be lost. Unregister() + // GracefulUnregister removes the EventHandle after + // all pending events have been processed. + GracefulUnregister(ctx context.Context, timeout time.Duration) error + // ErrCh returns a channel that outputs the first non-nil result of events submitted to this EventHandle. // Note that a non-nil result of an event cancels the EventHandle, so there is at most one error. ErrCh() <-chan error diff --git a/pkg/workerpool/pool_impl.go b/pkg/workerpool/pool_impl.go index 97af0aaec33..578532f99ec 100644 --- a/pkg/workerpool/pool_impl.go +++ b/pkg/workerpool/pool_impl.go @@ -19,10 +19,9 @@ import ( "sync/atomic" "time" - "github.com/pingcap/log" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/notify" "go.uber.org/zap" @@ -90,11 +89,19 @@ func (p *defaultPoolImpl) RegisterEvent(f func(ctx context.Context, event interf return handler } +type handleStatus = int32 + +const ( + handleRunning = handleStatus(iota) + handleCancelling + handleCancelled +) + type defaultEventHandle struct { // the function to be run each time the event is triggered f func(ctx context.Context, event interface{}) error - // whether this handle has been cancelled, must be accessed atomically - isCancelled int32 + // must be accessed atomically + status handleStatus // channel for the error returned by f errCh chan error // the worker that the handle is associated with @@ -120,7 +127,8 @@ type defaultEventHandle struct { } func (h *defaultEventHandle) AddEvent(ctx context.Context, event interface{}) error { - if atomic.LoadInt32(&h.isCancelled) == 1 { + status := atomic.LoadInt32(&h.status) + if status != handleRunning { return cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs() } @@ -158,7 +166,10 @@ func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duratio } func (h *defaultEventHandle) Unregister() { - if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) { + if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelled) { + // call synchronize so that the returning of Unregister cannot race + // with the calling of the errorHandler, if an error is already being processed. + h.worker.synchronize() // already cancelled return } @@ -172,6 +183,46 @@ func (h *defaultEventHandle) Unregister() { h.doCancel(cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs()) } +func (h *defaultEventHandle) GracefulUnregister(ctx context.Context, timeout time.Duration) error { + if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelling) { + // already cancelling or cancelled + return nil + } + + defer func() { + if !atomic.CompareAndSwapInt32(&h.status, handleCancelling, handleCancelled) { + // already cancelled + return + } + + // call synchronize so that all function executions related to this handle will be + // linearized BEFORE Unregister. + h.worker.synchronize() + h.doCancel(cerrors.ErrWorkerPoolHandleCancelled.GenWithStackByArgs()) + }() + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + doneCh := make(chan struct{}) + select { + case <-ctx.Done(): + return cerrors.ErrWorkerPoolGracefulUnregisterTimedOut.GenWithStackByArgs() + case h.worker.taskCh <- task{ + handle: h, + doneCh: doneCh, + }: + } + + select { + case <-ctx.Done(): + return cerrors.ErrWorkerPoolGracefulUnregisterTimedOut.GenWithStackByArgs() + case <-doneCh: + } + + return nil +} + // callers of doCancel need to check h.isCancelled first. // DO NOT call doCancel multiple times on the same handle. func (h *defaultEventHandle) doCancel(err error) { @@ -202,7 +253,7 @@ func (h *defaultEventHandle) HashCode() int64 { } func (h *defaultEventHandle) cancelWithErr(err error) { - if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) { + if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelled) { // already cancelled return } @@ -236,6 +287,8 @@ func (h *defaultEventHandle) doTimer(ctx context.Context) error { type task struct { handle *defaultEventHandle f func(ctx context.Context) error + + doneCh chan struct{} // only used in implementing GracefulUnregister } type worker struct { @@ -279,11 +332,19 @@ func (w *worker) run(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case task := <-w.taskCh: - if atomic.LoadInt32(&task.handle.isCancelled) == 1 { + if atomic.LoadInt32(&task.handle.status) == handleCancelled { // ignored cancelled handle continue } + if task.doneCh != nil { + close(task.doneCh) + if task.f != nil { + log.L().DPanic("unexpected message handler func in cancellation task", zap.Stack("stack")) + } + continue + } + err := task.f(ctx) if err != nil { task.handle.cancelWithErr(err) @@ -296,7 +357,7 @@ func (w *worker) run(ctx context.Context) error { w.handleRWLock.RLock() for handle := range w.handles { - if atomic.LoadInt32(&handle.isCancelled) == 1 { + if atomic.LoadInt32(&handle.status) == handleCancelled { // ignored cancelled handle continue } @@ -352,7 +413,7 @@ func (w *worker) synchronize() { // likely the workerpool has deadlocked, or there is a bug // in the event handlers. logWarn("synchronize is taking too long, report a bug", - zap.Duration("elapsed", time.Since(startTime)), + zap.Duration("duration", time.Since(startTime)), zap.Stack("stacktrace")) } } diff --git a/pkg/workerpool/pool_test.go b/pkg/workerpool/pool_test.go index 729c20d7a31..ae7729df82c 100644 --- a/pkg/workerpool/pool_test.go +++ b/pkg/workerpool/pool_test.go @@ -20,10 +20,10 @@ import ( "testing" "time" - "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -31,14 +31,9 @@ import ( "golang.org/x/time/rate" ) -func TestSuite(t *testing.T) { check.TestingT(t) } +func TestTaskError(t *testing.T) { + defer testleak.AfterTestT(t)() -type workerPoolSuite struct{} - -var _ = check.Suite(&workerPoolSuite{}) - -func (s *workerPoolSuite) TestTaskError(c *check.C) { - defer testleak.AfterTest(c)() ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) pool := newDefaultPoolImpl(&defaultHasher{}, 4) @@ -53,7 +48,7 @@ func (s *workerPoolSuite) TestTaskError(c *check.C) { } return nil }).OnExit(func(err error) { - c.Assert(err, check.ErrorMatches, "test error") + require.Regexp(t, "test error", err) }) var wg sync.WaitGroup @@ -63,16 +58,16 @@ func (s *workerPoolSuite) TestTaskError(c *check.C) { for i := 0; i < 10; i++ { err := handle.AddEvent(ctx, i) if err != nil { - c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") + require.Regexp(t, ".*ErrWorkerPoolHandleCancelled.*", err) } } }() select { case <-ctx.Done(): - c.FailNow() + require.FailNow(t, "fail") case err := <-handle.ErrCh(): - c.Assert(err, check.ErrorMatches, "test error") + require.Regexp(t, "test error", err) } // Only cancel the context after all events have been sent, // otherwise the event delivery may fail due to context cancellation. @@ -80,11 +75,12 @@ func (s *workerPoolSuite) TestTaskError(c *check.C) { cancel() err := errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } -func (s *workerPoolSuite) TestTimerError(c *check.C) { - defer testleak.AfterTest(c)() +func TestTimerError(t *testing.T) { + defer testleak.AfterTestT(t)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -107,18 +103,19 @@ func (s *workerPoolSuite) TestTimerError(c *check.C) { select { case <-ctx.Done(): - c.FailNow() + require.FailNow(t, "fail") case err := <-handle.ErrCh(): - c.Assert(err, check.ErrorMatches, "timer error") + require.Regexp(t, "timer error", err) } cancel() err := errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } -func (s *workerPoolSuite) TestMultiError(c *check.C) { - defer testleak.AfterTest(c)() +func TestMultiError(t *testing.T) { + defer testleak.AfterTestT(t)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) pool := newDefaultPoolImpl(&defaultHasher{}, 4) @@ -141,16 +138,16 @@ func (s *workerPoolSuite) TestMultiError(c *check.C) { for i := 0; i < 10; i++ { err := handle.AddEvent(ctx, i) if err != nil { - c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") + require.Regexp(t, ".*ErrWorkerPoolHandleCancelled.*", err) } } }() select { case <-ctx.Done(): - c.FailNow() + require.FailNow(t, "fail") case err := <-handle.ErrCh(): - c.Assert(err, check.ErrorMatches, "test error") + require.Regexp(t, "test error", err) } // Only cancel the context after all events have been sent, // otherwise the event delivery may fail due to context cancellation. @@ -158,11 +155,12 @@ func (s *workerPoolSuite) TestMultiError(c *check.C) { cancel() err := errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } -func (s *workerPoolSuite) TestCancelHandle(c *check.C) { - defer testleak.AfterTest(c)() +func TestCancelHandle(t *testing.T) { + defer testleak.AfterTestT(t)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -188,8 +186,8 @@ func (s *workerPoolSuite) TestCancelHandle(c *check.C) { } err := handle.AddEvent(ctx, i) if err != nil { - c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") - c.Assert(i, check.GreaterEqual, 5000) + require.Regexp(t, ".*ErrWorkerPoolHandleCancelled.*", err) + require.GreaterOrEqual(t, i, 5000) return nil } i++ @@ -199,7 +197,7 @@ func (s *workerPoolSuite) TestCancelHandle(c *check.C) { for { select { case <-ctx.Done(): - c.FailNow() + require.FailNow(t, "fail") default: } if atomic.LoadInt32(&num) > 5000 { @@ -208,7 +206,7 @@ func (s *workerPoolSuite) TestCancelHandle(c *check.C) { } err := failpoint.Enable("github.com/pingcap/tiflow/pkg/workerpool/addEventDelayPoint", "1*sleep(500)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/pkg/workerpool/addEventDelayPoint") }() @@ -219,18 +217,19 @@ func (s *workerPoolSuite) TestCancelHandle(c *check.C) { lastNum := atomic.LoadInt32(&num) for i := 0; i <= 1000; i++ { - c.Assert(atomic.LoadInt32(&num), check.Equals, lastNum) + require.Equal(t, atomic.LoadInt32(&num), lastNum) } time.Sleep(1 * time.Second) cancel() err = errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } -func (s *workerPoolSuite) TestCancelTimer(c *check.C) { - defer testleak.AfterTest(c)() +func TestCancelTimer(t *testing.T) { + defer testleak.AfterTestT(t)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -241,7 +240,7 @@ func (s *workerPoolSuite) TestCancelTimer(c *check.C) { }) err := failpoint.Enable("github.com/pingcap/tiflow/pkg/workerpool/unregisterDelayPoint", "sleep(5000)") - c.Assert(err, check.IsNil) + require.Nil(t, err) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/pkg/workerpool/unregisterDelayPoint") }() @@ -257,7 +256,7 @@ func (s *workerPoolSuite) TestCancelTimer(c *check.C) { for { err := handle.AddEvent(ctx, i) if err != nil { - c.Assert(err, check.ErrorMatches, ".*ErrWorkerPoolHandleCancelled.*") + require.Regexp(t, ".*ErrWorkerPoolHandleCancelled.*", err) return nil } i++ @@ -268,11 +267,44 @@ func (s *workerPoolSuite) TestCancelTimer(c *check.C) { cancel() err = errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } -func (s *workerPoolSuite) TestTimer(c *check.C) { - defer testleak.AfterTest(c)() +func TestErrorAndCancelRace(t *testing.T) { + defer testleak.AfterTestT(t)() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + var racedVar int + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + return errors.New("fake") + }).OnExit(func(err error) { + time.Sleep(100 * time.Millisecond) + racedVar++ + }) + + err := handle.AddEvent(ctx, 0) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) + handle.Unregister() + racedVar++ + + cancel() + err = errg.Wait() + require.Regexp(t, "context canceled", err) +} + +func TestTimer(t *testing.T) { + defer testleak.AfterTestT(t)() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -295,8 +327,8 @@ func (s *workerPoolSuite) TestTimer(c *check.C) { count := 0 handle.SetTimer(ctx, time.Second*1, func(ctx context.Context) error { if !lastTime.IsZero() { - c.Assert(time.Since(lastTime), check.GreaterEqual, 900*time.Millisecond) - c.Assert(time.Since(lastTime), check.LessEqual, 1200*time.Millisecond) + require.GreaterOrEqual(t, time.Since(lastTime), 900*time.Millisecond) + require.LessOrEqual(t, time.Since(lastTime), 1200*time.Millisecond) } if count == 3 { cancel() @@ -309,11 +341,11 @@ func (s *workerPoolSuite) TestTimer(c *check.C) { }) err := errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } -func (s *workerPoolSuite) TestBasics(c *check.C) { - defer testleak.AfterTest(c)() +func TestBasics(t *testing.T) { + defer testleak.AfterTestT(t)() ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() @@ -361,7 +393,7 @@ func (s *workerPoolSuite) TestBasics(c *check.C) { default: } log.Debug("result received", zap.Int("id", finalI), zap.Int("result", n)) - c.Assert(n, check.Equals, nextExpected) + require.Equal(t, n, nextExpected) nextExpected++ if nextExpected == 256 { break @@ -375,20 +407,20 @@ func (s *workerPoolSuite) TestBasics(c *check.C) { cancel() err := errg.Wait() - c.Assert(err, check.ErrorMatches, "context canceled") + require.Regexp(t, "context canceled", err) } // TestCancelByAddEventContext makes sure that the event handle can be cancelled by the context used // to call `AddEvent`. -func (s *workerPoolSuite) TestCancelByAddEventContext(c *check.C) { - defer testleak.AfterTest(c)() +func TestCancelByAddEventContext(t *testing.T) { + defer testleak.AfterTestT(t)() poolCtx, poolCancel := context.WithCancel(context.Background()) defer poolCancel() pool := newDefaultPoolImpl(&defaultHasher{}, 4) go func() { err := pool.Run(poolCtx) - c.Assert(err, check.ErrorMatches, ".*context canceled.*") + require.Regexp(t, ".*context canceled.*", err) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) @@ -424,10 +456,114 @@ func (s *workerPoolSuite) TestCancelByAddEventContext(c *check.C) { cancel() err := errg.Wait() - c.Assert(err, check.IsNil) + require.Nil(t, err) +} + +func TestGracefulUnregister(t *testing.T) { + defer testleak.AfterTestT(t)() + + poolCtx, poolCancel := context.WithCancel(context.Background()) + defer poolCancel() + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + go func() { + err := pool.Run(poolCtx) + require.Regexp(t, ".*context canceled.*", err) + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() + + waitCh := make(chan struct{}) + + var lastEventIdx int64 + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-waitCh: + } + + idx := event.(int64) + old := atomic.SwapInt64(&lastEventIdx, idx) + require.Equal(t, old+1, idx) + return nil + }) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + var maxEventIdx int64 + for i := int64(0); ; i++ { + err := handle.AddEvent(ctx, i+1) + if cerror.ErrWorkerPoolHandleCancelled.Equal(err) { + maxEventIdx = i + break + } + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + } + + require.Eventually(t, func() (success bool) { + return atomic.LoadInt64(&lastEventIdx) == maxEventIdx + }, time.Millisecond*500, time.Millisecond*10) + }() + + time.Sleep(time.Millisecond * 200) + go func() { + close(waitCh) + }() + err := handle.GracefulUnregister(ctx, time.Second*10) + require.NoError(t, err) + + err = handle.AddEvent(ctx, int64(0)) + require.Error(t, err) + require.True(t, cerror.ErrWorkerPoolHandleCancelled.Equal(err)) + require.Equal(t, handleCancelled, handle.(*defaultEventHandle).status) + + wg.Wait() +} + +func TestGracefulUnregisterTimeout(t *testing.T) { + defer testleak.AfterTestT(t)() + + poolCtx, poolCancel := context.WithCancel(context.Background()) + defer poolCancel() + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + go func() { + err := pool.Run(poolCtx) + require.Regexp(t, ".*context canceled.*", err) + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() + + waitCh := make(chan struct{}) + + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + select { + case <-waitCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) + + err := handle.AddEvent(ctx, 0) + require.NoError(t, err) + + go func() { + time.Sleep(time.Millisecond * 100) + close(waitCh) + }() + err = handle.GracefulUnregister(ctx, time.Millisecond*10) + require.Error(t, err) + require.Truef(t, cerror.ErrWorkerPoolGracefulUnregisterTimedOut.Equal(err), "%s", err.Error()) } func TestSynchronizeLog(t *testing.T) { + defer testleak.AfterTestT(t)() + w := newWorker() w.isRunning = 1 // Always report "synchronize is taking too long".