diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index cfc1f09e5405a..8eeb9f676761e 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -178,19 +178,20 @@ func (c *Cluster) GetStoreByAddr(addr string) *metapb.Store { } // GetAndCheckStoreByAddr checks and returns a Store's meta by an addr -func (c *Cluster) GetAndCheckStoreByAddr(addr string) (*metapb.Store, error) { +func (c *Cluster) GetAndCheckStoreByAddr(addr string) (ss []*metapb.Store, err error) { c.RLock() defer c.RUnlock() for _, s := range c.stores { if s.cancel { - return nil, context.Canceled + err = context.Canceled + return } if s.meta.GetAddress() == addr { - return proto.Clone(s.meta).(*metapb.Store), nil + ss = append(ss, proto.Clone(s.meta).(*metapb.Store)) } } - return nil, nil + return } // AddStore add a new Store to the cluster. @@ -209,6 +210,15 @@ func (c *Cluster) RemoveStore(storeID uint64) { delete(c.stores, storeID) } +// MarkTombstone marks store as tombstone. +func (c *Cluster) MarkTombstone(storeID uint64) { + c.Lock() + defer c.Unlock() + nm := *c.stores[storeID].meta + nm.State = metapb.StoreState_Tombstone + c.stores[storeID].meta = &nm +} + // UpdateStoreAddr updates store address for cluster. func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) { c.Lock() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 8d0fb5ce39c98..290798cd087ad 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -740,18 +740,20 @@ func NewRPCClient(cluster *Cluster, mvccStore MVCCStore) *RPCClient { } func (c *RPCClient) getAndCheckStoreByAddr(addr string) (*metapb.Store, error) { - store, err := c.Cluster.GetAndCheckStoreByAddr(addr) + stores, err := c.Cluster.GetAndCheckStoreByAddr(addr) if err != nil { return nil, err } - if store == nil { + if len(stores) == 0 { return nil, errors.New("connect fail") } - if store.GetState() == metapb.StoreState_Offline || - store.GetState() == metapb.StoreState_Tombstone { - return nil, errors.New("connection refused") + for _, store := range stores { + if store.GetState() != metapb.StoreState_Offline && + store.GetState() != metapb.StoreState_Tombstone { + return store, nil + } } - return store, nil + return nil, errors.New("connection refused") } func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*rpcHandler, error) { diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 456a56ae347b7..e629d81a5a0a8 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -1522,7 +1522,7 @@ func (s *Store) reResolve(c *RegionCache) { // we cannot do backoff in reResolve loop but try check other store and wait tick. return } - if store == nil { + if store == nil || store.State == metapb.StoreState_Tombstone { // store has be removed in PD, we should invalidate all regions using those store. logutil.BgLogger().Info("invalidate regions in removed store", zap.Uint64("store", s.storeID), zap.String("add", s.addr)) diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index c2f2d6e9d7cf3..b2dc597524b41 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -833,6 +833,33 @@ func (s *testRegionCacheSuite) TestReplaceNewAddrAndOldOfflineImmediately(c *C) c.Assert(getVal, BytesEquals, testValue) } +func (s *testRegionCacheSuite) TestReplaceStore(c *C) { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &RawKVClient{ + clusterID: 0, + regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore), + } + defer client.Close() + testKey := []byte("test_key") + testValue := []byte("test_value") + err := client.Put(testKey, testValue) + c.Assert(err, IsNil) + + s.cluster.MarkTombstone(s.store1) + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(s.store1)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.RemovePeer(s.region1, s.peer1) + s.cluster.ChangeLeader(s.region1, peer3) + + err = client.Put(testKey, testValue) + c.Assert(err, IsNil) +} + func (s *testRegionCacheSuite) TestListRegionIDsInCache(c *C) { // ['' - 'm' - 'z'] region2 := s.cluster.AllocID() diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index f1523456427b2..85cf978c85454 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -604,7 +604,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil { // store not match - logutil.BgLogger().Warn("tikv reports `StoreNotMatch` retry later", + logutil.BgLogger().Debug("tikv reports `StoreNotMatch` retry later", zap.Stringer("storeNotMatch", storeNotMatch), zap.Stringer("ctx", ctx)) ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh)