Skip to content

Commit

Permalink
Fix the integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 7, 2023
1 parent acf7555 commit 5b11c78
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 75 deletions.
70 changes: 17 additions & 53 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ type apiTestSuite struct {
}

func (s *apiTestSuite) SetupTest() {
require := s.Require()
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().NoError(err)
require.NoError(err)
rpcClient := tikv.NewRPCClient()
s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
s.store = store
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(addrs, nil))
require.NoError(err)
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
}
Expand Down Expand Up @@ -123,34 +124,26 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
s.waitForMinSafeTS(dcLabel, 100)
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) {
s.Eventually(func() bool {
return s.store.GetMinSafeTS(txnScope) == ts
}, time.Second, 200*time.Millisecond)
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
require := s.Require()
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
Expand All @@ -173,15 +166,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}

s.waitForMinSafeTS(dcLabel, 150)
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
Expand All @@ -197,44 +182,23 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the store's min resolved ts is not initialized.
s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64)
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the store's min resolved ts is not regarded as MaxUint64.
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
mockClient.SetKVSafeTS(150)
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the minSafeTS can advance.
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}
Expand Down
40 changes: 18 additions & 22 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func WithPool(gp Pool) Option {
}

// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
func WithPDHTTPClient(pdAddrs []string, tlsConf *tls.Config) Option {
return func(o *KVStore) {
o.pdHttpClient = pdhttp.NewClient(pdaddrs, pdhttp.WithTLSConfig(tlsConf))
o.pdHttpClient = pdhttp.NewClient(pdAddrs, pdhttp.WithTLSConfig(tlsConf))
}
}

Expand Down Expand Up @@ -611,7 +611,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
for i, store := range stores {
storeIDs[i] = store.StoreID()
}
_, storeMinResolvedTSs, err = s.getGetMinResolvedTSByStoresIDs(ctx, storeIDs)
_, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
Expand Down Expand Up @@ -651,8 +651,10 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
logutil.BgLogger().Info("get min resolved ts from tikv", zap.Uint64("store-id", storeID), zap.Uint64("safe-ts", safeTS))
} else {
safeTS = storeMinResolvedTSs[storeID]
logutil.BgLogger().Info("get min resolved ts from pd", zap.Uint64("store-id", storeID), zap.Uint64("safe-ts", safeTS))
}

_, preSafeTS := s.getSafeTS(storeID)
Expand Down Expand Up @@ -683,7 +685,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) getGetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
var (
minResolvedTS uint64
storeMinResolvedTSs map[uint64]uint64
Expand All @@ -694,25 +696,19 @@ func (s *KVStore) getGetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs [
return 0, nil, err
}
if val, e := util.EvalFailpoint("InjectPDMinResolvedTS"); e == nil {
// Need to make sure successfully get from real pd.
if storeMinResolvedTSs != nil {
for storeID, v := range storeMinResolvedTSs {
if v != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
storeMinResolvedTSs[storeID] = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp)))
}
}
injectedTS, ok := val.(int)
if !ok {
return minResolvedTS, storeMinResolvedTSs, err
}
minResolvedTS = uint64(injectedTS)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(injectedTS)))
// Currently we only have a store 1 in the test, so it's OK to inject the same min resolved TS for all stores here.
for storeID, v := range storeMinResolvedTSs {
if v != 0 && v != math.MaxUint64 {
storeMinResolvedTSs[storeID] = uint64(injectedTS)
logutil.BgLogger().Info("inject store min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(injectedTS)))
}
} else if tmp, ok := val.(int); ok {
// Should be val.(uint64) but failpoint doesn't support that.
// ci's store id is 1, we can change it if we have more stores.
// but for pool ci it's no need to do that :(
minResolvedTS = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp)))
}

}
return minResolvedTS, storeMinResolvedTSs, err
}
Expand All @@ -729,7 +725,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
Expand Down

0 comments on commit 5b11c78

Please sign in to comment.