Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to integrate with the PD HTTP client #1049

Merged
merged 5 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/client-go/v2

go 1.21

replace github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864

require (
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -111,8 +114,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc h1:IUg0j2nWoGYj3FQ3vA3vg97fPSpJEZQrDpgF8RkMLEU=
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc/go.mod h1:wfHRc4iYaqJiOQZCHcrF+r4hYnkGDaYWDfcicee//pc=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module integration_tests

go 1.21

replace github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864

require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down Expand Up @@ -507,8 +509,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb h1:hAcH9tFjQzQ3+ofrAHm4ajOTLliYCOfXpj3+boKOtac=
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb/go.mod h1:E+6qtPu8fJm5kNjvKWPVFqSgNAFPk07y2EjD03GWzuI=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
Expand Down
24 changes: 12 additions & 12 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
Expand All @@ -133,7 +133,7 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
}
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
Expand All @@ -142,7 +142,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(100 * time.Millisecond)
Expand All @@ -153,11 +153,11 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Set DC label for store 1.
// Mock PD server not support get min resolved ts by stores.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
Expand All @@ -184,7 +184,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {

require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
Expand All @@ -196,7 +196,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
// Make sure the store's min resolved ts is not initialized.
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
Expand All @@ -207,10 +207,10 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
}
// Make sure the store's min resolved ts is not initialized.
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
Expand All @@ -221,10 +221,10 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
}
// Make sure the store's min resolved ts is not regarded as MaxUint64.
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
mockClient.SetKVSafeTS(150)
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
Expand All @@ -236,7 +236,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
}
// Make sure the minSafeTS can advance.
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TearDownTest() {
Expand Down
4 changes: 4 additions & 0 deletions tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pdhttp "github.com/tikv/pd/client/http"
)

// Storage represent the kv.Storage runs on TiKV.
Expand Down Expand Up @@ -69,6 +70,9 @@ type Storage interface {
// GetTiKVClient gets the TiKV client.
GetTiKVClient() Client

// GetPDHTTPClient gets the PD HTTP client.
GetPDHTTPClient() pdhttp.Client

// Closed returns the closed channel.
Closed() <-chan struct{}

Expand Down
63 changes: 54 additions & 9 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -115,7 +116,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient *util.PDHTTPClient
pdHttpClient pdhttp.Client
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
Expand Down Expand Up @@ -147,6 +148,8 @@ type KVStore struct {
gP Pool
}

var _ Storage = (*KVStore)(nil)

// Go run the function in a separate goroutine.
func (s *KVStore) Go(f func()) error {
return s.gP.Run(f)
Expand Down Expand Up @@ -197,7 +200,7 @@ func WithPool(gp Pool) Option {
// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
return func(o *KVStore) {
o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
o.pdHttpClient = pdhttp.NewClient(pdaddrs, pdhttp.WithTLSConfig(tlsConf))
}
}

Expand Down Expand Up @@ -444,6 +447,11 @@ func (s *KVStore) GetPDClient() pd.Client {
return s.pdClient
}

// GetPDHTTPClient returns the PD HTTP client.
func (s *KVStore) GetPDHTTPClient() pdhttp.Client {
return s.pdHttpClient
}

// SupportDeleteRange gets the storage support delete range or not.
func (s *KVStore) SupportDeleteRange() (supported bool) {
return !s.mock
Expand Down Expand Up @@ -598,28 +606,31 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
err error
storeMinResolvedTSs map[uint64]uint64
)
storeIDs := make([]string, len(stores))
storeIDs := make([]uint64, len(stores))
if s.pdHttpClient != nil {
for i, store := range stores {
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
storeIDs[i] = store.StoreID()
}
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
_, storeMinResolvedTSs, err = s.getGetMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
}
}

for i, store := range stores {
for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
if store.IsTiFlash() {
storeAddr = store.GetPeerAddr()
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) {
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()

var safeTS uint64
var (
safeTS uint64
storeIDStr = strconv.FormatUint(storeID, 10)
)
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
resp, err := tikvClient.SendRequest(
Expand Down Expand Up @@ -655,7 +666,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc()
safeTSTime := oracle.GetTimeFromTS(safeTS)
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
}(ctx, wg, storeID, storeAddr, storeIDs[i])
}(ctx, wg, storeID, storeAddr)
}

txnScopeMap := make(map[string][]uint64)
Expand All @@ -672,6 +683,40 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) getGetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
var (
minResolvedTS uint64
storeMinResolvedTSs map[uint64]uint64
err error
)
minResolvedTS, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
return 0, nil, err
}
if val, e := util.EvalFailpoint("InjectPDMinResolvedTS"); e == nil {
// Need to make sure successfully get from real pd.
if storeMinResolvedTSs != nil {
for storeID, v := range storeMinResolvedTSs {
if v != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
storeMinResolvedTSs[storeID] = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp)))
}
}
}
} else if tmp, ok := val.(int); ok {
// Should be val.(uint64) but failpoint doesn't support that.
// ci's store id is 1, we can change it if we have more stores.
// but for pool ci it's no need to do that :(
minResolvedTS = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp)))
}

}
return minResolvedTS, storeMinResolvedTSs, err
}

var (
skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
Expand Down
Loading
Loading