diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 1d45b4917..a990c9cad 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -52,6 +52,7 @@ func (s *apiTestSuite) SetupTest() { pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) s.Require().NoError(err) rpcClient := tikv.NewRPCClient() + s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) // Set PD HTTP client. store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs)) s.store = store @@ -101,10 +102,8 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error { func (s *apiTestSuite) TestGetStoresMinResolvedTS() { util.EnableFailpoints() require := s.Require() - require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) - defer func() { - require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) - }() + mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) + s.store.SetTiKVClient(&mockClient) // Set DC label for store 1. // Mock Cluster-level min resolved ts failed. @@ -124,8 +123,6 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() { s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) // Try to get the minimum resolved timestamp of the stores from PD. require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) - s.store.SetTiKVClient(&mockClient) var retryCount int for s.store.GetMinSafeTS(dcLabel) != 100 { time.Sleep(100 * time.Millisecond) @@ -136,20 +133,16 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() { } require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel)) - require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) } func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { util.EnableFailpoints() - // Try to get the minimum resolved timestamp of the cluster from PD. require := s.Require() - require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) - defer func() { - require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) - }() - require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) s.store.SetTiKVClient(&mockClient) + // Try to get the minimum resolved timestamp of the cluster from PD. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) var retryCount int for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { time.Sleep(100 * time.Millisecond) @@ -165,9 +158,6 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { // Set DC label for store 1. // Mock PD server not support get min resolved ts by stores. require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) - defer func() { - require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) - }() dcLabel := "testDC" restore := config.UpdateGlobal(func(conf *config.Config) { conf.TxnScope = dcLabel @@ -182,7 +172,6 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { } storeID := uint64(1) s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) - // Try to get the minimum resolved timestamp of the store from TiKV. retryCount = 0 for s.store.GetMinSafeTS(dcLabel) != 150 { @@ -195,18 +184,15 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) } func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { util.EnableFailpoints() require := s.Require() - require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) - defer func() { - require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) - }() - mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) s.store.SetTiKVClient(&mockClient) + // Make sure the store's min resolved ts is not initialized. mockClient.SetKVSafeTS(0) // Try to get the minimum resolved timestamp of the cluster from TiKV. @@ -219,6 +205,9 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { } retryCount++ } + // Make sure the store's min resolved ts is not initialized. + require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) // Try to get the minimum resolved timestamp of the cluster from PD. require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) @@ -230,6 +219,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { } retryCount++ } + // Make sure the store's min resolved ts is not regarded as MaxUint64. require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) @@ -244,6 +234,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { } retryCount++ } + // Make sure the minSafeTS can advance. require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) } @@ -252,4 +243,5 @@ func (s *apiTestSuite) TearDownTest() { if s.store != nil { s.Require().Nil(s.store.Close()) } + s.Require().NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) }