Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cluster cache. #118

Merged
merged 12 commits into from
May 30, 2016
22 changes: 11 additions & 11 deletions cmd/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

241 changes: 171 additions & 70 deletions cmd/vendor/github.com/pingcap/kvproto/pkg/pdpb/pdpb.pb.go

Large diffs are not rendered by default.

232 changes: 232 additions & 0 deletions server/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package server

import (
"math/rand"
"sync"

"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
)

// RegionInfo is region cache info.
type RegionInfo struct {
region *metapb.Region
// leader peer
peer *metapb.Peer
}

func (r *RegionInfo) clone() *RegionInfo {
return &RegionInfo{
region: proto.Clone(r.region).(*metapb.Region),
peer: proto.Clone(r.peer).(*metapb.Peer),
}
}

// RegionsInfo is regions cache info.
type RegionsInfo struct {
sync.RWMutex

// region id -> RegionInfo
leaderRegions map[uint64]*RegionInfo
// store id -> regionid -> struct{}
storeLeaderRegions map[uint64]map[uint64]struct{}
}

func newRegionsInfo() *RegionsInfo {
return &RegionsInfo{
leaderRegions: make(map[uint64]*RegionInfo),
storeLeaderRegions: make(map[uint64]map[uint64]struct{}),
}
}

// randRegion random selects a region from region cache.
func (r *RegionsInfo) randRegion(storeID uint64) *RegionInfo {
r.RLock()
defer r.RUnlock()

storeRegions, ok := r.storeLeaderRegions[storeID]
if !ok {
return nil
}

idx, randIdx, randRegionID := 0, rand.Intn(len(storeRegions)), uint64(0)
for regionID := range storeRegions {
if idx == randIdx {
randRegionID = regionID
break
}

idx++
}

region, ok := r.leaderRegions[randRegionID]
if ok {
return region.clone()
}

return nil
}

func (r *RegionsInfo) addRegion(region *metapb.Region, leaderPeer *metapb.Peer) {
r.Lock()
defer r.Unlock()

regionID := region.GetId()
cacheRegion, regionExist := r.leaderRegions[regionID]
if regionExist {
// If region leader has been changed, then remove old region from store cache.
oldLeaderPeer := cacheRegion.peer
if oldLeaderPeer.GetId() != leaderPeer.GetId() {
storeID := oldLeaderPeer.GetStoreId()
storeRegions, storeExist := r.storeLeaderRegions[storeID]
if storeExist {
delete(storeRegions, regionID)
if len(storeRegions) == 0 {
delete(r.storeLeaderRegions, storeID)
}
}
}
}

r.leaderRegions[regionID] = &RegionInfo{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If version or conf version doesn't change, I think we can return directly, no need to update cache again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should add this check later, maybe in next PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so please add a TODO comment here.

region: region,
peer: leaderPeer,
}

storeID := leaderPeer.GetStoreId()
store, ok := r.storeLeaderRegions[storeID]
if !ok {
store = make(map[uint64]struct{})
r.storeLeaderRegions[storeID] = store
}
store[regionID] = struct{}{}
}

func (r *RegionsInfo) removeRegion(regionID uint64) {
r.Lock()
defer r.Unlock()

cacheRegion, ok := r.leaderRegions[regionID]
if ok {
storeID := cacheRegion.peer.GetStoreId()
storeRegions, ok := r.storeLeaderRegions[storeID]
if ok {
delete(storeRegions, regionID)
if len(storeRegions) == 0 {
delete(r.storeLeaderRegions, storeID)
}
}

delete(r.leaderRegions, regionID)
}
}

// StoreInfo is store cache info.
type StoreInfo struct {
store *metapb.Store

// store capacity info.
stats *pdpb.StoreStats
}

func (s *StoreInfo) clone() *StoreInfo {
return &StoreInfo{
store: proto.Clone(s.store).(*metapb.Store),
stats: proto.Clone(s.stats).(*pdpb.StoreStats),
}
}

// usedRatio is the used capacity ratio of storage capacity.
func (s *StoreInfo) usedRatio() float64 {
if s.stats.GetCapacity() == 0 {
return 0
}

return float64(s.stats.GetCapacity()-s.stats.GetAvailable()) / float64(s.stats.GetCapacity())
}

// ClusterInfo is cluster cache info.
type ClusterInfo struct {
sync.RWMutex

meta *metapb.Cluster
stores map[uint64]*StoreInfo
regions *RegionsInfo
}

func newClusterInfo() *ClusterInfo {
return &ClusterInfo{
stores: make(map[uint64]*StoreInfo),
regions: newRegionsInfo(),
}
}

func (c *ClusterInfo) addStore(store *metapb.Store) {
c.Lock()
defer c.Unlock()

storeInfo := &StoreInfo{
store: store,
stats: &pdpb.StoreStats{},
}

c.stores[store.GetId()] = storeInfo
}

func (c *ClusterInfo) removeStore(storeID uint64) {
c.Lock()
defer c.Unlock()

delete(c.stores, storeID)
}

func (c *ClusterInfo) getStore(storeID uint64) *StoreInfo {
c.RLock()
defer c.RUnlock()

store, ok := c.stores[storeID]
if !ok {
return nil
}

return store.clone()
}

func (c *ClusterInfo) getStores() map[uint64]*StoreInfo {
c.RLock()
defer c.RUnlock()

stores := make(map[uint64]*StoreInfo, len(c.stores))
for key, store := range c.stores {
stores[key] = store.clone()
}

return stores
}

func (c *ClusterInfo) getMetaStores() []metapb.Store {
c.RLock()
defer c.RUnlock()

stores := make([]metapb.Store, 0, len(c.stores))
for _, store := range c.stores {
stores = append(stores, *proto.Clone(store.store).(*metapb.Store))
}

return stores
}

func (c *ClusterInfo) setMeta(meta *metapb.Cluster) {
c.Lock()
defer c.Unlock()

c.meta = meta
}

func (c *ClusterInfo) getMeta() *metapb.Cluster {
c.RLock()
defer c.RUnlock()

return proto.Clone(c.meta).(*metapb.Cluster)
}
Loading