Skip to content

Commit

Permalink
sink/flowcontrol(ticdc): Parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Apr 25, 2022
1 parent d21b7b0 commit 49ed662
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions cdc/sink/flowcontrol/flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (c *mockCallBacker) cb() error {
}

func TestMemoryQuotaBasic(t *testing.T) {
t.Parallel()

controller := newTableMemoryQuota(1024)
sizeCh := make(chan uint64, 1024)
var (
Expand Down Expand Up @@ -80,6 +82,8 @@ func TestMemoryQuotaBasic(t *testing.T) {
}

func TestMemoryQuotaForceConsume(t *testing.T) {
t.Parallel()

controller := newTableMemoryQuota(1024)
sizeCh := make(chan uint64, 1024)
var (
Expand Down Expand Up @@ -126,6 +130,8 @@ func TestMemoryQuotaForceConsume(t *testing.T) {

// TestMemoryQuotaAbort verifies that abort works
func TestMemoryQuotaAbort(t *testing.T) {
t.Parallel()

controller := newTableMemoryQuota(1024)
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -149,6 +155,8 @@ func TestMemoryQuotaAbort(t *testing.T) {

// TestMemoryQuotaReleaseZero verifies that releasing 0 bytes is successful
func TestMemoryQuotaReleaseZero(t *testing.T) {
t.Parallel()

controller := newTableMemoryQuota(1024)
controller.release(0)
}
Expand All @@ -159,6 +167,8 @@ type mockedEvent struct {
}

func TestFlowControlBasic(t *testing.T) {
t.Parallel()

var consumedBytes uint64
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -271,6 +281,8 @@ func TestFlowControlBasic(t *testing.T) {
}

func TestFlowControlAbort(t *testing.T) {
t.Parallel()

callBacker := &mockCallBacker{}
controller := NewTableFlowController(1024)
var wg sync.WaitGroup
Expand All @@ -296,6 +308,8 @@ func TestFlowControlAbort(t *testing.T) {
}

func TestFlowControlCallBack(t *testing.T) {
t.Parallel()

var consumedBytes uint64
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -401,6 +415,8 @@ func TestFlowControlCallBack(t *testing.T) {
}

func TestFlowControlCallBackNotBlockingRelease(t *testing.T) {
t.Parallel()

var wg sync.WaitGroup
controller := NewTableFlowController(512)
wg.Add(1)
Expand Down Expand Up @@ -441,6 +457,8 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) {
}

func TestFlowControlCallBackError(t *testing.T) {
t.Parallel()

var wg sync.WaitGroup
controller := NewTableFlowController(512)
wg.Add(1)
Expand Down Expand Up @@ -469,6 +487,8 @@ func TestFlowControlCallBackError(t *testing.T) {
}

func TestFlowControlConsumeLargerThanQuota(t *testing.T) {
t.Parallel()

controller := NewTableFlowController(1024)
err := controller.Consume(1, 2048, func() error {
t.Error("unreachable")
Expand Down

0 comments on commit 49ed662

Please sign in to comment.