Skip to content

Commit

Permalink
*: reduce the store clone (#1410)
Browse files Browse the repository at this point in the history
* reduce store clone

Signed-off-by: rleungx <[email protected]>
  • Loading branch information
rleungx authored and huachaohuang committed Jan 30, 2019
1 parent 21a3537 commit 9db4a56
Show file tree
Hide file tree
Showing 43 changed files with 830 additions and 423 deletions.
34 changes: 17 additions & 17 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,40 +71,40 @@ const (
func newStoreInfo(opt *server.ScheduleConfig, store *core.StoreInfo) *StoreInfo {
s := &StoreInfo{
Store: &MetaStore{
Store: store.Store,
StateName: store.State.String(),
Store: store.GetMeta(),
StateName: store.GetState().String(),
},
Status: &StoreStatus{
Capacity: typeutil.ByteSize(store.Stats.GetCapacity()),
Available: typeutil.ByteSize(store.Stats.GetAvailable()),
LeaderCount: store.LeaderCount,
LeaderWeight: store.LeaderWeight,
Capacity: typeutil.ByteSize(store.GetCapacity()),
Available: typeutil.ByteSize(store.GetAvailable()),
LeaderCount: store.GetLeaderCount(),
LeaderWeight: store.GetLeaderWeight(),
LeaderScore: store.LeaderScore(0),
LeaderSize: store.LeaderSize,
RegionCount: store.RegionCount,
RegionWeight: store.RegionWeight,
LeaderSize: store.GetLeaderSize(),
RegionCount: store.GetRegionCount(),
RegionWeight: store.GetRegionWeight(),
RegionScore: store.RegionScore(opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.RegionSize,
SendingSnapCount: store.Stats.GetSendingSnapCount(),
ReceivingSnapCount: store.Stats.GetReceivingSnapCount(),
ApplyingSnapCount: store.Stats.GetApplyingSnapCount(),
IsBusy: store.Stats.GetIsBusy(),
RegionSize: store.GetRegionSize(),
SendingSnapCount: store.GetSendingSnapCount(),
ReceivingSnapCount: store.GetReceivingSnapCount(),
ApplyingSnapCount: store.GetApplyingSnapCount(),
IsBusy: store.GetIsBusy(),
},
}

if store.Stats != nil {
if store.GetStoreStats() != nil {
startTS := store.GetStartTS()
s.Status.StartTS = &startTS
}
if lastHeartbeat := store.LastHeartbeatTS; !lastHeartbeat.IsZero() {
if lastHeartbeat := store.GetLastHeartbeatTS(); !lastHeartbeat.IsZero() {
s.Status.LastHeartbeatTS = &lastHeartbeat
}
if upTime := store.GetUptime(); upTime > 0 {
duration := typeutil.NewDuration(upTime)
s.Status.Uptime = &duration
}

if store.State == metapb.StoreState_Up {
if store.GetState() == metapb.StoreState_Up {
if store.DownTime() > opt.MaxStoreDownTime.Duration {
s.Store.StateName = downStateName
} else if store.IsDisconnected() {
Expand Down
18 changes: 9 additions & 9 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,21 +275,21 @@ func (s *testStoreSuite) TestUrlStoreFilter(c *C) {
}

func (s *testStoreSuite) TestDownState(c *C) {
store := &core.StoreInfo{
Store: &metapb.Store{
store := core.NewStoreInfo(
&metapb.Store{
State: metapb.StoreState_Up,
},
Stats: &pdpb.StoreStats{},
LastHeartbeatTS: time.Now(),
}
core.SetStoreStats(&pdpb.StoreStats{}),
core.SetLastHeartbeatTS(time.Now()),
)
storeInfo := newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Up.String())

store.LastHeartbeatTS = time.Now().Add(-time.Minute * 2)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
newStore := store.Clone(core.SetLastHeartbeatTS(time.Now().Add(-time.Minute * 2)))
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), newStore)
c.Assert(storeInfo.Store.StateName, Equals, disconnectedName)

store.LastHeartbeatTS = time.Now().Add(-time.Hour * 2)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
newStore = store.Clone(core.SetLastHeartbeatTS(time.Now().Add(-time.Hour * 2)))
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), newStore)
c.Assert(storeInfo.Store.StateName, Equals, downStateName)
}
4 changes: 2 additions & 2 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {
LastHeartbeatTS: info.Status.LastHeartbeatTS,
Uptime: info.Status.Uptime,
}
s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetId())
s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetId())
s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetID())
s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetID())
trendStores = append(trendStores, s)
}
return trendStores, nil
Expand Down
51 changes: 30 additions & 21 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -345,10 +346,10 @@ func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLa
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
}
storeMeta := store.Store
storeMeta.Labels = labels
newStore := proto.Clone(store.GetMeta()).(*metapb.Store)
newStore.Labels = labels
// putStore will perform label merge.
err := c.putStore(storeMeta)
err := c.putStore(newStore)
return err
}

Expand Down Expand Up @@ -377,8 +378,8 @@ func (c *RaftCluster) putStore(store *metapb.Store) error {
if s.IsTombstone() {
continue
}
if s.GetId() != store.GetId() && s.GetAddress() == store.GetAddress() {
return errors.Errorf("duplicated store address: %v, already registered by %v", store, s.Store)
if s.GetID() != store.GetId() && s.GetAddress() == store.GetAddress() {
return errors.Errorf("duplicated store address: %v, already registered by %v", store, s.GetMeta())
}
}

Expand All @@ -388,9 +389,13 @@ func (c *RaftCluster) putStore(store *metapb.Store) error {
s = core.NewStoreInfo(store)
} else {
// Update an existed store.
s.Address = store.Address
s.Version = store.Version
s.MergeLabels(store.Labels)
labels := s.MergeLabels(store.GetLabels())

s = s.Clone(
core.SetStoreAddress(store.Address),
core.SetStoreVersion(store.Version),
core.SetStoreLabels(labels),
)
}
// Check location labels.
for _, k := range c.cachedCluster.GetLocationLabels() {
Expand Down Expand Up @@ -424,9 +429,9 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error {
return op.AddTo(core.StoreTombstonedErr{StoreID: storeID})
}

store.State = metapb.StoreState_Offline
log.Warnf("[store %d] store %s has been Offline", store.GetId(), store.GetAddress())
return cluster.putStore(store)
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline))
log.Warnf("[store %d] store %s has been Offline", newStore.GetID(), newStore.GetAddress())
return cluster.putStore(newStore)
}

// BuryStore marks a store as tombstone in cluster.
Expand Down Expand Up @@ -456,9 +461,9 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error { // revive:di
log.Warnf("forcedly bury store %v", store)
}

store.State = metapb.StoreState_Tombstone
log.Warnf("[store %d] store %s has been Tombstone", store.GetId(), store.GetAddress())
return cluster.putStore(store)
newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone))
log.Warnf("[store %d] store %s has been Tombstone", newStore.GetID(), newStore.GetAddress())
return cluster.putStore(newStore)
}

// SetStoreState sets up a store's state.
Expand All @@ -473,13 +478,13 @@ func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) err
return core.NewStoreNotFoundErr(storeID)
}

store.State = state
newStore := store.Clone(core.SetStoreState(state))
log.Warnf("[store %d] set state to %v", storeID, state.String())
return cluster.putStore(store)
return cluster.putStore(newStore)
}

// SetStoreWeight sets up a store's leader/region balance weight.
func (c *RaftCluster) SetStoreWeight(storeID uint64, leader, region float64) error {
func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight float64) error {
c.RLock()
defer c.RUnlock()

Expand All @@ -488,12 +493,16 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leader, region float64) err
return core.NewStoreNotFoundErr(storeID)
}

if err := c.s.kv.SaveStoreWeight(storeID, leader, region); err != nil {
if err := c.s.kv.SaveStoreWeight(storeID, leaderWeight, regionWeight); err != nil {
return err
}

store.LeaderWeight, store.RegionWeight = leader, region
return c.cachedCluster.putStore(store)
newStore := store.Clone(
core.SetLeaderWeight(leaderWeight),
core.SetRegionWeight(regionWeight),
)

return c.cachedCluster.putStore(newStore)
}

func (c *RaftCluster) checkStores() {
Expand All @@ -513,7 +522,7 @@ func (c *RaftCluster) checkStores() {
continue
}

offlineStore := store.Store
offlineStore := store.GetMeta()
// If the store is empty, it can be buried.
if cluster.getStoreRegionCount(offlineStore.GetId()) == 0 {
if err := c.BuryStore(offlineStore.GetId(), false); err != nil {
Expand Down
23 changes: 11 additions & 12 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ func (c *clusterInfo) GetStore(storeID uint64) *core.StoreInfo {
func (c *clusterInfo) putStore(store *core.StoreInfo) error {
c.Lock()
defer c.Unlock()
return c.putStoreLocked(store.Clone())
return c.putStoreLocked(store)
}

func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error {
if c.kv != nil {
if err := c.kv.SaveStore(store.Store); err != nil {
if err := c.kv.SaveStore(store.GetMeta()); err != nil {
return err
}
}
Expand Down Expand Up @@ -447,19 +447,18 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error {
if store == nil {
return core.NewStoreNotFoundErr(storeID)
}
store.Stats = proto.Clone(stats).(*pdpb.StoreStats)
store.LastHeartbeatTS = time.Now()

c.core.Stores.SetStore(store)
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(time.Now()))
c.core.Stores.SetStore(newStore)
return nil
}

func (c *clusterInfo) updateStoreStatusLocked(id uint64) {
c.core.Stores.SetLeaderCount(id, c.core.Regions.GetStoreLeaderCount(id))
c.core.Stores.SetRegionCount(id, c.core.Regions.GetStoreRegionCount(id))
c.core.Stores.SetPendingPeerCount(id, c.core.Regions.GetStorePendingPeerCount(id))
c.core.Stores.SetLeaderSize(id, c.core.Regions.GetStoreLeaderRegionSize(id))
c.core.Stores.SetRegionSize(id, c.core.Regions.GetStoreRegionSize(id))
leaderCount := c.core.Regions.GetStoreLeaderCount(id)
regionCount := c.core.Regions.GetStoreRegionCount(id)
pendingPeerCount := c.core.Regions.GetStorePendingPeerCount(id)
leaderRegionSize := c.core.Regions.GetStoreLeaderRegionSize(id)
regionSize := c.core.Regions.GetStoreRegionSize(id)
c.core.Stores.UpdateStoreStatusLocked(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

// handleRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -766,7 +765,7 @@ func (checker *prepareChecker) check(c *clusterInfo) bool {
if !store.IsUp() {
continue
}
storeID := store.GetId()
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.core.Regions.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
Expand Down
45 changes: 24 additions & 21 deletions server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"math/rand"

"github.com/gogo/protobuf/proto"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (s *testStoresInfoSuite) TestStores(c *C) {
stores := newTestStores(n)

for i, store := range stores {
id := store.GetId()
id := store.GetID()
c.Assert(cache.GetStore(id), IsNil)
c.Assert(cache.BlockStore(id), NotNil)
cache.SetStore(store)
Expand All @@ -70,10 +71,10 @@ func (s *testStoresInfoSuite) TestStores(c *C) {
c.Assert(cache.GetStoreCount(), Equals, int(n))

for _, store := range cache.GetStores() {
c.Assert(store, DeepEquals, stores[store.GetId()-1])
c.Assert(store, DeepEquals, stores[store.GetID()-1])
}
for _, store := range cache.GetMetaStores() {
c.Assert(store, DeepEquals, stores[store.GetId()-1].Store)
c.Assert(store, DeepEquals, stores[store.GetId()-1].GetMeta())
}

c.Assert(cache.GetStoreCount(), Equals, int(n))
Expand All @@ -82,10 +83,12 @@ func (s *testStoresInfoSuite) TestStores(c *C) {
bytesRead := uint64(128 * 1024 * 1024)
store := cache.GetStore(1)

store.Stats.BytesWritten = bytesWritten
store.Stats.BytesRead = bytesRead
store.Stats.Interval = &pdpb.TimeInterval{EndTimestamp: 10, StartTimestamp: 0}
cache.SetStore(store)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.BytesRead = bytesRead
newStats.Interval = &pdpb.TimeInterval{EndTimestamp: 10, StartTimestamp: 0}
newStore := store.Clone(core.SetStoreStats(newStats))
cache.SetStore(newStore)
c.Assert(cache.TotalBytesWriteRate(), Equals, float64(bytesWritten/10))
c.Assert(cache.TotalBytesReadRate(), Equals, float64(bytesRead/10))
}
Expand Down Expand Up @@ -311,7 +314,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {

for i, store := range stores {
storeStats := &pdpb.StoreStats{
StoreId: store.GetId(),
StoreId: store.GetID(),
Capacity: 100,
Available: 50,
RegionCount: 1,
Expand All @@ -321,23 +324,23 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
c.Assert(cluster.putStore(store), IsNil)
c.Assert(cluster.getStoreCount(), Equals, i+1)

c.Assert(store.LastHeartbeatTS.IsZero(), IsTrue)
c.Assert(store.GetLastHeartbeatTS().IsZero(), IsTrue)

c.Assert(cluster.handleStoreHeartbeat(storeStats), IsNil)

s := cluster.GetStore(store.GetId())
c.Assert(s.LastHeartbeatTS.IsZero(), IsFalse)
c.Assert(s.Stats, DeepEquals, storeStats)
s := cluster.GetStore(store.GetID())
c.Assert(s.GetLastHeartbeatTS().IsZero(), IsFalse)
c.Assert(s.GetStoreStats(), DeepEquals, storeStats)
}

c.Assert(cluster.getStoreCount(), Equals, int(n))

for _, store := range stores {
tmp := &metapb.Store{}
ok, err := cluster.kv.LoadStore(store.GetId(), tmp)
ok, err := cluster.kv.LoadStore(store.GetID(), tmp)
c.Assert(ok, IsTrue)
c.Assert(err, IsNil)
c.Assert(tmp, DeepEquals, store.Store)
c.Assert(tmp, DeepEquals, store.GetMeta())
}
}

Expand Down Expand Up @@ -458,19 +461,19 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {

for _, region := range regions {
for _, store := range cluster.GetRegionStores(region) {
c.Assert(region.GetStorePeer(store.GetId()), NotNil)
c.Assert(region.GetStorePeer(store.GetID()), NotNil)
}
for _, store := range cluster.GetFollowerStores(region) {
peer := region.GetStorePeer(store.GetId())
peer := region.GetStorePeer(store.GetID())
c.Assert(peer.GetId(), Not(Equals), region.GetLeader().GetId())
}
}

for _, store := range cluster.core.Stores.GetStores() {
c.Assert(store.LeaderCount, Equals, cluster.core.Regions.GetStoreLeaderCount(store.GetId()))
c.Assert(store.RegionCount, Equals, cluster.core.Regions.GetStoreRegionCount(store.GetId()))
c.Assert(store.LeaderSize, Equals, cluster.core.Regions.GetStoreLeaderRegionSize(store.GetId()))
c.Assert(store.RegionSize, Equals, cluster.core.Regions.GetStoreRegionSize(store.GetId()))
c.Assert(store.GetLeaderCount(), Equals, cluster.core.Regions.GetStoreLeaderCount(store.GetID()))
c.Assert(store.GetRegionCount(), Equals, cluster.core.Regions.GetStoreRegionCount(store.GetID()))
c.Assert(store.GetLeaderSize(), Equals, cluster.core.Regions.GetStoreLeaderRegionSize(store.GetID()))
c.Assert(store.GetRegionSize(), Equals, cluster.core.Regions.GetStoreRegionSize(store.GetID()))
}

// Test with kv.
Expand Down Expand Up @@ -672,7 +675,7 @@ func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) {
func checkPendingPeerCount(expect []int, cluster *clusterInfo, c *C) {
for i, e := range expect {
s := cluster.core.Stores.GetStore(uint64(i + 1))
c.Assert(s.PendingPeerCount, Equals, e)
c.Assert(s.GetPendingPeerCount(), Equals, e)
}
}

Expand Down
Loading

0 comments on commit 9db4a56

Please sign in to comment.