From 49ed6624a66b2d88e0f8459d3852a2c595866517 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 25 Apr 2022 17:58:42 +0800 Subject: [PATCH] sink/flowcontrol(ticdc): Parallel --- cdc/sink/flowcontrol/flow_control_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 5b77732d4b1..24f639fdf8a 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -40,6 +40,8 @@ func (c *mockCallBacker) cb() error { } func TestMemoryQuotaBasic(t *testing.T) { + t.Parallel() + controller := newTableMemoryQuota(1024) sizeCh := make(chan uint64, 1024) var ( @@ -80,6 +82,8 @@ func TestMemoryQuotaBasic(t *testing.T) { } func TestMemoryQuotaForceConsume(t *testing.T) { + t.Parallel() + controller := newTableMemoryQuota(1024) sizeCh := make(chan uint64, 1024) var ( @@ -126,6 +130,8 @@ func TestMemoryQuotaForceConsume(t *testing.T) { // TestMemoryQuotaAbort verifies that abort works func TestMemoryQuotaAbort(t *testing.T) { + t.Parallel() + controller := newTableMemoryQuota(1024) var wg sync.WaitGroup wg.Add(1) @@ -149,6 +155,8 @@ func TestMemoryQuotaAbort(t *testing.T) { // TestMemoryQuotaReleaseZero verifies that releasing 0 bytes is successful func TestMemoryQuotaReleaseZero(t *testing.T) { + t.Parallel() + controller := newTableMemoryQuota(1024) controller.release(0) } @@ -159,6 +167,8 @@ type mockedEvent struct { } func TestFlowControlBasic(t *testing.T) { + t.Parallel() + var consumedBytes uint64 ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() @@ -271,6 +281,8 @@ func TestFlowControlBasic(t *testing.T) { } func TestFlowControlAbort(t *testing.T) { + t.Parallel() + callBacker := &mockCallBacker{} controller := NewTableFlowController(1024) var wg sync.WaitGroup @@ -296,6 +308,8 @@ func TestFlowControlAbort(t *testing.T) { } func TestFlowControlCallBack(t *testing.T) { + t.Parallel() + var consumedBytes uint64 ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() @@ -401,6 +415,8 @@ func TestFlowControlCallBack(t *testing.T) { } func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { + t.Parallel() + var wg sync.WaitGroup controller := NewTableFlowController(512) wg.Add(1) @@ -441,6 +457,8 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { } func TestFlowControlCallBackError(t *testing.T) { + t.Parallel() + var wg sync.WaitGroup controller := NewTableFlowController(512) wg.Add(1) @@ -469,6 +487,8 @@ func TestFlowControlCallBackError(t *testing.T) { } func TestFlowControlConsumeLargerThanQuota(t *testing.T) { + t.Parallel() + controller := NewTableFlowController(1024) err := controller.Consume(1, 2048, func() error { t.Error("unreachable")