Skip to content

Commit

Permalink
server: make region statistic loss some precision (#3728)
Browse files Browse the repository at this point in the history
* add the loss precision

Signed-off-by: nolouch <[email protected]>

* fix comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comments

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* fix

Signed-off-by: nolouch <[email protected]>

* clean

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
nolouch and ti-chi-bot authored Jun 4, 2021
1 parent da4cdd4 commit 6891009
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 18 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,11 @@ error = '''
client url empty
'''

["PD:server:ErrConfiguration"]
error = '''
cannot set invalid configuration
'''

["PD:server:ErrLeaderNil"]
error = '''
leader is nil
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var (
ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty"))
ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil"))
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
)

// logutil errors
Expand Down
9 changes: 4 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,11 +657,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}

if c.traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() ||
region.GetBytesRead() != origin.GetBytesRead() ||
region.GetKeysWritten() != origin.GetKeysWritten() ||
region.GetKeysRead() != origin.GetKeysRead()) {
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if c.traceRegionFlow && (region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead()) {
saveCache, needSync = true, true
}

Expand Down
12 changes: 0 additions & 12 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,23 +512,11 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys written.
region = region.Clone(core.SetWrittenKeys(240))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys read.
region = region.Clone(core.SetReadKeys(1080))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
}

regionCounts := make(map[uint64]int)
Expand Down
16 changes: 16 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ const (

defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRoundByDigit = 3
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"

Expand Down Expand Up @@ -327,6 +328,12 @@ func adjustInt64(v *int64, defValue int64) {
}
}

func adjustInt(v *int, defValue int) {
if *v == 0 {
*v = defValue
}
}

func adjustFloat64(v *float64, defValue float64) {
if *v == 0 {
*v = defValue
Expand Down Expand Up @@ -1070,7 +1077,10 @@ type PDServerConfig struct {
// There are some values supported: "auto", "none", or a specific address, default: "auto"
DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"`
// TraceRegionFlow the option to update flow information of regions
// TODO: deprecate
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"`
// FlowRoundByDigit used to discretization processing flow information.
FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"`
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
Expand All @@ -1090,6 +1100,9 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("trace-region-flow") {
c.TraceRegionFlow = defaultTraceRegionFlow
}
if !meta.IsDefined("flow-round-by-digit") {
adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit)
}
return c.Validate()
}

Expand All @@ -1111,6 +1124,9 @@ func (c *PDServerConfig) Validate() error {
return err
}
}
if c.FlowRoundByDigit < 0 {
return errs.ErrConfigItem.GenWithStack("flow round by digit cannot be negative number")
}

return nil
}
Expand Down
17 changes: 17 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RegionInfo struct {
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
flowRoundDivisor uint64
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -396,11 +397,27 @@ func (r *RegionInfo) GetBytesRead() uint64 {
return r.readBytes
}

// GetRoundBytesRead returns the read bytes of the region.
func (r *RegionInfo) GetRoundBytesRead() uint64 {
if r.flowRoundDivisor == 0 {
return r.readBytes
}
return ((r.readBytes + r.flowRoundDivisor/2) / r.flowRoundDivisor) * r.flowRoundDivisor
}

// GetBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetBytesWritten() uint64 {
return r.writtenBytes
}

// GetRoundBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetRoundBytesWritten() uint64 {
if r.flowRoundDivisor == 0 {
return r.writtenBytes
}
return ((r.writtenBytes + r.flowRoundDivisor/2) / r.flowRoundDivisor) * r.flowRoundDivisor
}

// GetKeysWritten returns the written keys of the region.
func (r *RegionInfo) GetKeysWritten() uint64 {
return r.writtenKeys
Expand Down
8 changes: 8 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package core

import (
"math"
"sort"

"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -35,6 +36,13 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {
}
}

// WithFlowRoundByDigit set the digit, which use to round to the nearest number
func WithFlowRoundByDigit(digit int) RegionCreateOption {
return func(region *RegionInfo) {
region.flowRoundDivisor = uint64(math.Pow10(digit))
}
}

// WithPendingPeers sets the pending peers for the region.
func WithPendingPeers(pendingPeers []*metapb.Peer) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
24 changes: 24 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core

import (
"fmt"
"math"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -155,6 +156,29 @@ func (s *testRegionInfoSuite) TestSortedEqual(c *C) {
}
}

func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) {
testcases := []struct {
flow uint64
digit int
expect uint64
}{
{10, 0, 10},
{13, 1, 10},
{11807, 3, 12000},
{252623, 4, 250000},
{258623, 4, 260000},
{258623, 64, 0},
{252623, math.MaxInt64, 0},
{252623, math.MinInt64, 252623},
}
for _, t := range testcases {
r := NewRegionInfo(&metapb.Region{Id: 100}, nil, WithFlowRoundByDigit(t.digit))
r.readBytes = t.flow
r.writtenBytes = t.flow
c.Assert(r.GetRoundBytesRead(), Equals, t.expect)
}
}

var _ = Suite(&testRegionMapSuite{})

type testRegionMapSuite struct{}
Expand Down
5 changes: 4 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
// RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit
var (
forwardStream pdpb.PD_RegionHeartbeatClient
cancel context.CancelFunc
Expand Down Expand Up @@ -583,10 +584,12 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
if time.Since(lastBind) > s.cfg.HeartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
s.hbStreams.BindStream(storeID, server)
// refresh FlowRoundByDigit
FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
9 changes: 9 additions & 0 deletions tests/pdctl/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ func (s *configTestSuite) TestConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().TraceRegionFlow, Equals, false)

args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "10"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().FlowRoundByDigit, Equals, 10)

args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "-10"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, NotNil)

// config show schedule
args = []string{"-u", pdAddr, "config", "show", "schedule"}
output, err = pdctl.ExecuteCommand(cmd, args...)
Expand Down

0 comments on commit 6891009

Please sign in to comment.