Skip to content

Commit

Permalink
store: fix data race about KVStore.tikvClient (#24655)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored May 14, 2021
1 parent 80a557e commit ea7f0ca
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
CommitVersion: c.commitTS,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})

sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort)

// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
Expand Down
25 changes: 16 additions & 9 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ var oracleUpdateInterval = 2000

// KVStore contains methods to interact with a TiKV cluster.
type KVStore struct {
clusterID uint64
uuid string
oracle oracle.Oracle
client Client
clusterID uint64
uuid string
oracle oracle.Oracle
clientMu struct {
sync.RWMutex
client Client
}
pdClient pd.Client
regionCache *RegionCache
lockResolver *LockResolver
Expand Down Expand Up @@ -133,7 +136,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client
clusterID: pdClient.GetClusterID(context.TODO()),
uuid: uuid,
oracle: o,
client: reqCollapse{client},
pdClient: pdClient,
regionCache: NewRegionCache(pdClient),
kv: spkv,
Expand All @@ -142,6 +144,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client
closed: make(chan struct{}),
replicaReadSeed: rand.Uint32(),
}
store.clientMu.client = reqCollapse{client}
store.lockResolver = newLockResolver(store)

go store.runSafePointChecker()
Expand Down Expand Up @@ -205,7 +208,7 @@ func (s *KVStore) Close() error {
s.pdClient.Close()

close(s.closed)
if err := s.client.Close(); err != nil {
if err := s.GetTiKVClient().Close(); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -312,7 +315,7 @@ func (s *KVStore) SupportDeleteRange() (supported bool) {

// SendReq sends a request to region.
func (s *KVStore) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
sender := NewRegionRequestSender(s.regionCache, s.client)
sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient())
return sender.SendReq(bo, req, regionID, timeout)
}

Expand Down Expand Up @@ -343,12 +346,16 @@ func (s *KVStore) SetOracle(oracle oracle.Oracle) {

// SetTiKVClient resets the client instance.
func (s *KVStore) SetTiKVClient(client Client) {
s.client = client
s.clientMu.Lock()
defer s.clientMu.Unlock()
s.clientMu.client = client
}

// GetTiKVClient gets the client instance.
func (s *KVStore) GetTiKVClient() (client Client) {
return s.client
s.clientMu.RLock()
defer s.clientMu.RUnlock()
return s.clientMu.client
}

func (s *KVStore) getSafeTS(storeID uint64) uint64 {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff

req := c.buildPrewriteRequest(batch, txnSize)
for {
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort)

// If we fail to receive response for async commit prewrite, it will be undetermined whether this
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
zap.String("nextEndKey", kv.StrKey(s.nextEndKey)),
zap.Bool("reverse", s.reverse),
zap.Uint64("txnStartTS", s.startTS()))
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client)
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.GetTiKVClient())
var reqEndKey, reqStartKey []byte
var loc *KeyLocation
var err error
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool
Priority: kvrpcpb.CommandPri_Normal,
})

sender := NewRegionRequestSender(s.regionCache, s.client)
sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient())
resp, err := sender.SendReq(bo, req, batch.regionID, ReadTimeoutShort)

batchResp := singleBatchResp{resp: resp}
Expand Down

0 comments on commit ea7f0ca

Please sign in to comment.