Skip to content

Commit

Permalink
MinSafeTS support TiFlash (tikv#642)
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <[email protected]>
  • Loading branch information
hehechen authored and hawkingrei committed Jan 10, 2023
1 parent 0bf06e8 commit 497cb2c
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 21 deletions.
29 changes: 24 additions & 5 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,16 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
}

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (c *RegionCache) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
c.storeMu.Lock()
defer c.storeMu.Unlock()
c.storeMu.stores[id] = &Store{
storeID: id,
storeType: storeType,
state: state,
labels: labels,
addr: addr,
peerAddr: peerAddr,
}
}

Expand Down Expand Up @@ -1381,15 +1383,24 @@ func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
})
}
stores = append(stores, &Store{
addr: store.addr,
storeID: store.storeID,
labels: storeLabel,
addr: store.addr,
peerAddr: store.peerAddr,
storeID: store.storeID,
labels: storeLabel,
storeType: typ,
})
}
}
return stores
}

// GetAllStores gets TiKV and TiFlash stores.
func (c *RegionCache) GetAllStores() []*Store {
stores := c.GetStoresByType(tikvrpc.TiKV)
tiflashStores := c.GetStoresByType(tikvrpc.TiFlash)
return append(stores, tiflashStores...)
}

func filterUnavailablePeers(region *pd.Region) {
if len(region.DownPeers) == 0 {
return
Expand Down Expand Up @@ -1830,6 +1841,7 @@ func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*St
res = append(res, &Store{
storeID: s.GetId(),
addr: s.GetAddress(),
peerAddr: s.GetPeerAddress(),
saddr: s.GetStatusAddress(),
storeType: tikvrpc.GetStoreTypeByMeta(s),
labels: s.GetLabels(),
Expand Down Expand Up @@ -2148,6 +2160,7 @@ func (r *Region) ContainsByEnd(key []byte) bool {
// Store contains a kv process's address.
type Store struct {
addr string // loaded store address
peerAddr string // TiFlash Proxy use peerAddr
saddr string // loaded store status address
storeID uint64 // store's id
state uint64 // unsafe store storeState
Expand Down Expand Up @@ -2235,6 +2248,7 @@ func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, e
return "", errors.Errorf("empty store(%d) address", s.storeID)
}
s.addr = addr
s.peerAddr = store.GetPeerAddress()
s.saddr = store.GetStatusAddress()
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
Expand Down Expand Up @@ -2281,7 +2295,7 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
storeType := tikvrpc.GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
Expand Down Expand Up @@ -2503,6 +2517,11 @@ func (s *Store) GetAddr() string {
return s.addr
}

// GetPeerAddr returns the peer address of the store
func (s *Store) GetPeerAddr() string {
return s.peerAddr
}

func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
start := time.Now()
defer func() {
Expand Down
37 changes: 23 additions & 14 deletions internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ var _ cluster.Cluster = &Cluster{}

// Cluster simulates a TiKV cluster. It focuses on management and the change of
// meta data. A Cluster mainly includes following 3 kinds of meta data:
// 1) Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2) Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3) Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
// 1. Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2. Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3. Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
type Cluster struct {
sync.RWMutex
id uint64
Expand Down Expand Up @@ -224,7 +224,7 @@ func (c *Cluster) AddStore(storeID uint64, addr string, labels ...*metapb.StoreL
c.Lock()
defer c.Unlock()

c.stores[storeID] = newStore(storeID, addr, labels...)
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
}

// RemoveStore removes a Store from the cluster.
Expand All @@ -248,7 +248,15 @@ func (c *Cluster) MarkTombstone(storeID uint64) {
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) {
c.Lock()
defer c.Unlock()
c.stores[storeID] = newStore(storeID, addr, labels...)
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
}

// UpdateStorePeerAddr updates store peer address for cluster.
func (c *Cluster) UpdateStorePeerAddr(storeID uint64, peerAddr string, labels ...*metapb.StoreLabel) {
c.Lock()
defer c.Unlock()
addr := c.stores[storeID].meta.Address
c.stores[storeID] = newStore(storeID, addr, peerAddr, labels...)
}

// GetRegion returns a Region's meta and leader ID.
Expand Down Expand Up @@ -691,12 +699,13 @@ type Store struct {
cancel bool // return context.Cancelled error when cancel is true.
}

func newStore(storeID uint64, addr string, labels ...*metapb.StoreLabel) *Store {
func newStore(storeID uint64, addr string, peerAddr string, labels ...*metapb.StoreLabel) *Store {
return &Store{
meta: &metapb.Store{
Id: storeID,
Address: addr,
Labels: labels,
Id: storeID,
Address: addr,
PeerAddress: peerAddr,
Labels: labels,
},
}
}
Expand Down
5 changes: 4 additions & 1 deletion tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,13 +521,16 @@ func (s *KVStore) safeTSUpdater() {
}

func (s *KVStore) updateSafeTS(ctx context.Context) {
stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
stores := s.regionCache.GetAllStores()
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
wg.Add(len(stores))
for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
if store.IsTiFlash() {
storeAddr = store.GetPeerAddr()
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()
resp, err := tikvClient.SendRequest(ctx, storeAddr, tikvrpc.NewRequest(tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{KeyRange: &kvrpcpb.KeyRange{
Expand Down
124 changes: 124 additions & 0 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2022 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestKV(t *testing.T) {
suite.Run(t, new(testKVSuite))
}

type testKVSuite struct {
suite.Suite
store *KVStore
cluster *mocktikv.Cluster
tikvStoreID uint64
tiflashStoreID uint64
tiflashPeerStoreID uint64
}

func (s *testKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
s.Require().Nil(err)

s.store = store
s.cluster = cluster

storeIDs, _, _, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2)
s.tikvStoreID = storeIDs[0]
s.tiflashStoreID = storeIDs[1]
tiflashPeerAddrID := cluster.AllocIDs(1)
s.tiflashPeerStoreID = tiflashPeerAddrID[0]

s.cluster.UpdateStorePeerAddr(s.tiflashStoreID, s.storeAddr(s.tiflashPeerStoreID), &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
s.store.regionCache.SetRegionCacheStore(s.tikvStoreID, s.storeAddr(s.tikvStoreID), s.storeAddr(s.tikvStoreID), tikvrpc.TiKV, 1, nil)
var labels []*metapb.StoreLabel
labels = append(labels, &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
s.store.regionCache.SetRegionCacheStore(s.tiflashStoreID, s.storeAddr(s.tiflashStoreID), s.storeAddr(s.tiflashPeerStoreID), tikvrpc.TiFlash, 1, labels)

}

func (s *testKVSuite) TearDownTest() {
s.Require().Nil(s.store.Close())
}

func (s *testKVSuite) storeAddr(id uint64) string {
return fmt.Sprintf("store%d", id)
}

type storeSafeTsMockClient struct {
Client
requestCount int32
testSuite *testKVSuite
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
atomic.AddInt32(&c.requestCount, 1)
resp := &tikvrpc.Response{}
if addr == c.testSuite.storeAddr(c.testSuite.tiflashPeerStoreID) {
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 80}
} else {
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 100}
}
return resp, nil
}

func (c *storeSafeTsMockClient) Close() error {
return c.Client.Close()
}

func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *testKVSuite) TestMinSafeTs() {
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
testSuite: s,
}
s.store.SetTiKVClient(&mockClient)

// wait for updateMinSafeTS
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 80 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}
2 changes: 1 addition & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s StoreProbe) SaveSafePoint(v uint64) error {

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
s.regionCache.SetRegionCacheStore(id, storeType, state, labels)
s.regionCache.SetRegionCacheStore(id, "", "", storeType, state, labels)
}

// SetSafeTS is used to set safeTS for the store with `storeID`
Expand Down

0 comments on commit 497cb2c

Please sign in to comment.