Skip to content

Commit

Permalink
ddl,test: fix unstable test TestParallelDDLBeforeRunDDLJob (#37873)
Browse files Browse the repository at this point in the history
close #37864
  • Loading branch information
YangKeao authored Sep 16, 2022
1 parent b37a48d commit 9d83a35
Showing 1 changed file with 16 additions and 33 deletions.
49 changes: 16 additions & 33 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
Expand All @@ -44,7 +42,6 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

// TestShowCreateTable tests the result of "show create table" when we are running "add index" or "add column".
Expand Down Expand Up @@ -1518,7 +1515,7 @@ func TestDDLIfExists(t *testing.T) {
// This test is used to simulate the following conditions:
// In a cluster, TiDB "a" executes the DDL.
// TiDB "b" fails to load schema, then TiDB "b" executes the DDL statement associated with the DDL statement executed by "a".
func TestParallelDDLBeforeRunDDLJo(t *testing.T) {
func TestParallelDDLBeforeRunDDLJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database test_db_state default charset utf8 default collate utf8_bin")
Expand All @@ -1533,38 +1530,25 @@ func TestParallelDDLBeforeRunDDLJo(t *testing.T) {
tk2.MustExec("use test_db_state")

intercept := &ddl.TestInterceptor{}
firstConnID := uint64(1)
finishedCnt := int32(0)
interval := 5 * time.Millisecond
var sessionCnt int32 // sessionCnt is the number of sessions that goes into the function of OnGetInfoSchema.

var sessionToStart sync.WaitGroup // sessionToStart is a waitgroup to wait for two session to get the same information schema
sessionToStart.Add(2)
firstDDLFinished := make(chan struct{})

intercept.OnGetInfoSchemaExported = func(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
// The following code is for testing.
// Make sure the two sessions get the same information schema before executing DDL.
// After the first session executes its DDL, then the second session executes its DDL.
var info infoschema.InfoSchema
atomic.AddInt32(&sessionCnt, 1)
for {
// Make sure there are two sessions running here.
if atomic.LoadInt32(&sessionCnt) == 2 {
info = is
break
}
// Print log to notify if TestParallelDDLBeforeRunDDLJob hang up
log.Info("sleep in TestParallelDDLBeforeRunDDLJob", zap.String("interval", interval.String()))
time.Sleep(interval)
}
sessionToStart.Done()
sessionToStart.Wait()
info = is

// Make sure the two session have got the same information schema. And the first session can continue to go on,
// or the first session finished this SQL(seCnt = finishedCnt), then other sessions can continue to go on.
currID := ctx.GetSessionVars().ConnectionID
for {
seCnt := atomic.LoadInt32(&sessionCnt)
// Make sure the two session have got the same information schema. And the first session can continue to go on,
// or the first session finished this SQL(seCnt = finishedCnt), then other sessions can continue to go on.
if currID == firstConnID || seCnt == finishedCnt {
break
}
// Print log to notify if TestParallelDDLBeforeRunDDLJob hang up
log.Info("sleep in TestParallelDDLBeforeRunDDLJob", zap.String("interval", interval.String()))
time.Sleep(interval)
if currID != 1 {
<-firstDDLFinished
}

return info
Expand All @@ -1575,12 +1559,11 @@ func TestParallelDDLBeforeRunDDLJo(t *testing.T) {
// Make sure the connection 1 executes a SQL before the connection 2.
// And the connection 2 executes a SQL with an outdated information schema.
var wg util.WaitGroupWrapper

wg.Run(func() {
tk1.Session().SetConnectionID(firstConnID)
tk1.Session().SetConnectionID(1)
tk1.MustExec("alter table test_table drop column c2")
// Sleep a while to make sure the connection 2 break out the first for loop in OnGetInfoSchemaExported, otherwise atomic.LoadInt32(&sessionCnt) == 2 will be false forever.
time.Sleep(100 * time.Millisecond)
atomic.StoreInt32(&sessionCnt, finishedCnt)
firstDDLFinished <- struct{}{}
})
wg.Run(func() {
tk2.Session().SetConnectionID(2)
Expand Down

0 comments on commit 9d83a35

Please sign in to comment.