Skip to content

Commit

Permalink
Fix the TSO consistency test
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 21, 2023
1 parent 7f156d9 commit 45cf012
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ ci-test-job: install-tools dashboard-ui

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso

test-tso: install-tools
# testing TSO function & consistency...
@$(FAILPOINT_ENABLE)
CGO_ENABLED=1 go test -race -tags without_dashboard,tso_full_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

test-tso-function: install-tools
# testing TSO function...
@$(FAILPOINT_ENABLE)
Expand Down
27 changes: 17 additions & 10 deletions tests/server/tso/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/grpcutil"
Expand Down Expand Up @@ -63,6 +64,7 @@ func (suite *tsoConsistencyTestSuite) TearDownSuite() {

// TestSynchronizedGlobalTSO is used to test the synchronized way of global TSO generation.
func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSO() {
re := suite.Require()
dcLocationConfig := map[string]string{
"pd1": "dc-1",
"pd2": "dc-2",
Expand All @@ -77,7 +79,6 @@ func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSO() {
re.NoError(err)
re.NoError(cluster.RunInitialServers())

re := suite.Require()
cluster.WaitAllLeaders(re, dcLocationConfig)

suite.leaderServer = cluster.GetLeaderServer()
Expand All @@ -95,12 +96,12 @@ func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSO() {
// Get some local TSOs first
oldLocalTSOs := make([]*pdpb.Timestamp, 0, dcLocationNum)
for _, dcLocation := range dcLocationConfig {
localTSO := suite.getTimestampByDC(ctx, cluster, dcLocation)
localTSO := suite.getTimestampByDC(ctx, re, cluster, dcLocation)
oldLocalTSOs = append(oldLocalTSOs, localTSO)
re.Equal(-1, tsoutil.CompareTimestamp(maxGlobalTSO, localTSO))
}
// Get a global TSO then
globalTSO := suite.getTimestampByDC(ctx, cluster, tso.GlobalDCLocation)
globalTSO := suite.getTimestampByDC(ctx, re, cluster, tso.GlobalDCLocation)
for _, oldLocalTSO := range oldLocalTSOs {
re.Equal(1, tsoutil.CompareTimestamp(globalTSO, oldLocalTSO))
}
Expand All @@ -110,15 +111,20 @@ func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSO() {
// Get some local TSOs again
newLocalTSOs := make([]*pdpb.Timestamp, 0, dcLocationNum)
for _, dcLocation := range dcLocationConfig {
newLocalTSOs = append(newLocalTSOs, suite.getTimestampByDC(ctx, cluster, dcLocation))
newLocalTSOs = append(newLocalTSOs, suite.getTimestampByDC(ctx, re, cluster, dcLocation))
}
for _, newLocalTSO := range newLocalTSOs {
re.Equal(-1, tsoutil.CompareTimestamp(maxGlobalTSO, newLocalTSO))
}
}
}

func (suite *tsoConsistencyTestSuite) getTimestampByDC(ctx context.Context, cluster *tests.TestCluster, dcLocation string) *pdpb.Timestamp {
func (suite *tsoConsistencyTestSuite) getTimestampByDC(
ctx context.Context,
re *require.Assertions,
cluster *tests.TestCluster,
dcLocation string,
) *pdpb.Timestamp {
req := &pdpb.TsoRequest{
Header: testutil.NewRequestHeader(suite.leaderServer.GetClusterID()),
Count: tsoCount,
Expand All @@ -138,6 +144,7 @@ func (suite *tsoConsistencyTestSuite) getTimestampByDC(ctx context.Context, clus
}

func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSOOverflow() {
re := suite.Require()
dcLocationConfig := map[string]string{
"pd1": "dc-1",
"pd2": "dc-2",
Expand All @@ -152,7 +159,6 @@ func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSOOverflow() {
re.NoError(err)
re.NoError(cluster.RunInitialServers())

re := suite.Require()
cluster.WaitAllLeaders(re, dcLocationConfig)

suite.leaderServer = cluster.GetLeaderServer()
Expand All @@ -166,11 +172,12 @@ func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSOOverflow() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/globalTSOOverflow", `return(true)`))
suite.getTimestampByDC(ctx, cluster, tso.GlobalDCLocation)
suite.getTimestampByDC(ctx, re, cluster, tso.GlobalDCLocation)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/globalTSOOverflow"))
}

func (suite *tsoConsistencyTestSuite) TestLocalAllocatorLeaderChange() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/mockLocalAllocatorLeaderChange", `return(true)`))
dcLocationConfig := map[string]string{
"pd1": "dc-1",
Expand All @@ -184,7 +191,6 @@ func (suite *tsoConsistencyTestSuite) TestLocalAllocatorLeaderChange() {
re.NoError(err)
re.NoError(cluster.RunInitialServers())

re := suite.Require()
cluster.WaitAllLeaders(re, dcLocationConfig)

suite.leaderServer = cluster.GetLeaderServer()
Expand All @@ -197,11 +203,12 @@ func (suite *tsoConsistencyTestSuite) TestLocalAllocatorLeaderChange() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
suite.getTimestampByDC(ctx, cluster, tso.GlobalDCLocation)
suite.getTimestampByDC(ctx, re, cluster, tso.GlobalDCLocation)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/mockLocalAllocatorLeaderChange"))
}

func (suite *tsoConsistencyTestSuite) TestLocalTSO() {
re := suite.Require()
dcLocationConfig := map[string]string{
"pd1": "dc-1",
"pd2": "dc-2",
Expand Down Expand Up @@ -232,6 +239,7 @@ func (suite *tsoConsistencyTestSuite) checkTSOUnique(tso *pdpb.Timestamp) bool {
}

func (suite *tsoConsistencyTestSuite) TestLocalTSOAfterMemberChanged() {
re := suite.Require()
dcLocationConfig := map[string]string{
"pd1": "dc-1",
"pd2": "dc-2",
Expand All @@ -246,7 +254,6 @@ func (suite *tsoConsistencyTestSuite) TestLocalTSOAfterMemberChanged() {
re.NoError(err)
re.NoError(cluster.RunInitialServers())

re := suite.Require()
cluster.WaitAllLeaders(re, dcLocationConfig)

leaderServer := cluster.GetLeaderServer()
Expand Down

0 comments on commit 45cf012

Please sign in to comment.