Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: make region statistic loss some precision #3728

Merged
merged 14 commits into from
Jun 4, 2021
5 changes: 5 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ var (
ErrSemverNewVersion = errors.Normalize("new version error", errors.RFCCodeText("PD:semver:ErrSemverNewVersion"))
)

// config
var (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the error is PD:filepath:ErrFilePathAbs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated. PTAL

ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:filepath:ErrFilePathAbs"))
)

// log
var (
ErrInitLogger = errors.Normalize("init logger error", errors.RFCCodeText("PD:log:ErrInitLogger"))
Expand Down
27 changes: 18 additions & 9 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ const (

defaultLeaderPriorityCheckInterval = time.Minute

defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowBucketsWidth = 512
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"
defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRroundByDigit = 3
nolouch marked this conversation as resolved.
Show resolved Hide resolved
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"

defaultStrictlyMatchLabel = false
defaultEnablePlacementRules = true
Expand Down Expand Up @@ -328,6 +328,12 @@ func adjustInt64(v *int64, defValue int64) {
}
}

func adjustInt(v *int, defValue int) {
if *v == 0 {
*v = defValue
}
}

func adjustFloat64(v *float64, defValue float64) {
if *v == 0 {
*v = defValue
Expand Down Expand Up @@ -1073,8 +1079,8 @@ type PDServerConfig struct {
// TraceRegionFlow the option to update flow information of regions
// TODO: deprecate
rleungx marked this conversation as resolved.
Show resolved Hide resolved
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"`
// FlowBucketsWidth is the width used to discretization processing flow information
FlowBucketsWidth uint64 `toml:"flow-buckets-width" json:"flow-buckets-width"`
// FlowRroundByDigit is the width used to discretization processing flow information
nolouch marked this conversation as resolved.
Show resolved Hide resolved
FlowRroundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"`
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
Expand All @@ -1094,8 +1100,8 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("trace-region-flow") {
c.TraceRegionFlow = defaultTraceRegionFlow
}
if !meta.IsDefined("flow-buckets-width") {
adjustUint64(&c.FlowBucketsWidth, defaultFlowBucketsWidth)
if !meta.IsDefined("flow-round-by-digit") {
adjustInt(&c.FlowRroundByDigit, defaultFlowRroundByDigit)
}
return c.Validate()
}
Expand All @@ -1118,6 +1124,9 @@ func (c *PDServerConfig) Validate() error {
return err
}
}
if c.FlowRroundByDigit < 0 {
return errs.ErrConfigItem.GenWithStack("flow round by digit cannot be negative number")
}

return nil
}
Expand Down
13 changes: 8 additions & 5 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"math"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -52,7 +53,7 @@ type RegionInfo struct {
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
flowBucketsWidth uint64
flowRroundByDigit int
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -399,10 +400,11 @@ func (r *RegionInfo) GetBytesRead() uint64 {

// GetRoundBytesRead returns the read bytes of the region.
func (r *RegionInfo) GetRoundBytesRead() uint64 {
if r.flowBucketsWidth == 0 {
divisor := uint64(math.Pow10(r.flowRroundByDigit))
if divisor == 0 {
return r.readBytes
}
return (r.readBytes / r.flowBucketsWidth) * r.flowBucketsWidth
return ((r.readBytes + divisor/2) / divisor) * divisor
}

// GetBytesWritten returns the written bytes of the region.
Expand All @@ -412,10 +414,11 @@ func (r *RegionInfo) GetBytesWritten() uint64 {

// GetRoundBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetRoundBytesWritten() uint64 {
if r.flowBucketsWidth == 0 {
divisor := uint64(math.Pow10(r.flowRroundByDigit))
if divisor == 0 {
return r.writtenBytes
}
return (r.writtenBytes / r.flowBucketsWidth) * r.flowBucketsWidth
return ((r.writtenBytes + divisor/2) / divisor) * divisor
}

// GetKeysWritten returns the written keys of the region.
Expand Down
9 changes: 4 additions & 5 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {
}
}

// WithFlowBucketsWidth set the bucket width, which use to round to the nearest bucket
// with the with.
func WithFlowBucketsWidth(width uint64) RegionCreateOption {
// WithFlowRroundByDigit set the digit, which use to round to the nearest number
func WithFlowRroundByDigit(digit int) RegionCreateOption {
return func(region *RegionInfo) {
if width == 0 {
if digit == 0 {
return
}
region.flowBucketsWidth = width
region.flowRroundByDigit = int(digit)
}
}

Expand Down
16 changes: 9 additions & 7 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,21 @@ func (s *testRegionInfoSuite) TestSortedEqual(c *C) {
func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) {
testcases := []struct {
flow uint64
bits uint64
digit int
expect uint64
}{
{10, 0, 10},
{13, 1, 13},
{11807, 512, 11776},
{252623, 512, 252416},
{252623, 10240, 245760},
{252623, math.MaxUint64, 0},
{13, 1, 10},
{11807, 3, 12000},
{252623, 4, 250000},
{258623, 4, 260000},
{258623, 64, 0},
{252623, math.MaxInt64, 0},
{252623, math.MinInt64, 252623},
}
for _, t := range testcases {
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.flowBucketsWidth = t.bits
r.flowRroundByDigit = t.digit
r.readBytes = t.flow
r.writtenBytes = t.flow
c.Assert(r.GetRoundBytesRead(), Equals, t.expect)
Expand Down
8 changes: 4 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
// RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
FlowBucketsWidth := s.persistOptions.GetPDServerConfig().FlowBucketsWidth
FlowRroundByDigit := s.persistOptions.GetPDServerConfig().FlowRroundByDigit
var (
forwardStream pdpb.PD_RegionHeartbeatClient
cancel context.CancelFunc
Expand Down Expand Up @@ -584,12 +584,12 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
if time.Since(lastBind) > s.cfg.HeartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
s.hbStreams.BindStream(storeID, server)
// refresh FlowBucketsWidth
FlowBucketsWidth = s.persistOptions.GetPDServerConfig().FlowBucketsWidth
// refresh FlowRroundByDigit
FlowRroundByDigit = s.persistOptions.GetPDServerConfig().FlowRroundByDigit
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, core.WithFlowBucketsWidth(FlowBucketsWidth))
region := core.RegionFromHeartbeat(request, core.WithFlowRroundByDigit(FlowRroundByDigit))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
8 changes: 6 additions & 2 deletions tests/pdctl/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ func (s *configTestSuite) TestConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().TraceRegionFlow, Equals, false)

args = []string{"-u", pdAddr, "config", "set", "flow-buckets-width", "10"}
args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "10"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().FlowBucketsWidth, Equals, uint64(10))
c.Assert(svr.GetPDServerConfig().FlowRroundByDigit, Equals, 10)

args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "-10"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, NotNil)

// config show schedule
args = []string{"-u", pdAddr, "config", "show", "schedule"}
Expand Down