Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: make region statistic loss some precision #3728

Merged
merged 14 commits into from
Jun 4, 2021
5 changes: 5 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ var (
ErrSemverNewVersion = errors.Normalize("new version error", errors.RFCCodeText("PD:semver:ErrSemverNewVersion"))
)

// config
var (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the error is PD:filepath:ErrFilePathAbs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated. PTAL

ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
)

// log
var (
ErrInitLogger = errors.Normalize("init logger error", errors.RFCCodeText("PD:log:ErrInitLogger"))
Expand Down
9 changes: 4 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,11 +655,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}

if c.traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() ||
region.GetBytesRead() != origin.GetBytesRead() ||
region.GetKeysWritten() != origin.GetKeysWritten() ||
region.GetKeysRead() != origin.GetKeysRead()) {
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if c.traceRegionFlow && (region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
region.GetRoundBytesRead() != origin.GetRoundBytesRead()) {
saveCache, needSync = true, true
}

Expand Down
12 changes: 0 additions & 12 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,23 +512,11 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys written.
region = region.Clone(core.SetWrittenKeys(240))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys read.
region = region.Clone(core.SetReadKeys(1080))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
}

regionCounts := make(map[uint64]int)
Expand Down
16 changes: 16 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ const (

defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRoundByDigit = 3
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"

Expand Down Expand Up @@ -327,6 +328,12 @@ func adjustInt64(v *int64, defValue int64) {
}
}

func adjustInt(v *int, defValue int) {
if *v == 0 {
*v = defValue
}
}

func adjustFloat64(v *float64, defValue float64) {
if *v == 0 {
*v = defValue
Expand Down Expand Up @@ -1070,7 +1077,10 @@ type PDServerConfig struct {
// There are some values supported: "auto", "none", or a specific address, default: "auto"
DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"`
// TraceRegionFlow the option to update flow information of regions
// TODO: deprecate
rleungx marked this conversation as resolved.
Show resolved Hide resolved
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"`
// FlowRoundByDigit used to discretization processing flow information.
FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"`
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
Expand All @@ -1090,6 +1100,9 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("trace-region-flow") {
c.TraceRegionFlow = defaultTraceRegionFlow
}
if !meta.IsDefined("flow-round-by-digit") {
adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit)
}
return c.Validate()
}

Expand All @@ -1111,6 +1124,9 @@ func (c *PDServerConfig) Validate() error {
return err
}
}
if c.FlowRoundByDigit < 0 {
return errs.ErrConfigItem.GenWithStack("flow round by digit cannot be negative number")
}

return nil
}
Expand Down
20 changes: 20 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"math"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -52,6 +53,7 @@ type RegionInfo struct {
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
flowRoundByDigit int
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -396,11 +398,29 @@ func (r *RegionInfo) GetBytesRead() uint64 {
return r.readBytes
}

// GetRoundBytesRead returns the read bytes of the region.
func (r *RegionInfo) GetRoundBytesRead() uint64 {
divisor := uint64(math.Pow10(r.flowRoundByDigit))
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
if divisor == 0 {
return r.readBytes
}
return ((r.readBytes + divisor/2) / divisor) * divisor
}

// GetBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetBytesWritten() uint64 {
return r.writtenBytes
}

// GetRoundBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetRoundBytesWritten() uint64 {
divisor := uint64(math.Pow10(r.flowRoundByDigit))
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
if divisor == 0 {
return r.writtenBytes
}
return ((r.writtenBytes + divisor/2) / divisor) * divisor
}

// GetKeysWritten returns the written keys of the region.
func (r *RegionInfo) GetKeysWritten() uint64 {
return r.writtenKeys
Expand Down
10 changes: 10 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {
}
}

// WithFlowRoundByDigit set the digit, which use to round to the nearest number
func WithFlowRoundByDigit(digit int) RegionCreateOption {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
return func(region *RegionInfo) {
if digit == 0 {
return
}
region.flowRoundByDigit = int(digit)
}
}

// WithPendingPeers sets the pending peers for the region.
func WithPendingPeers(pendingPeers []*metapb.Peer) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
25 changes: 25 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core

import (
"fmt"
"math"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -155,6 +156,30 @@ func (s *testRegionInfoSuite) TestSortedEqual(c *C) {
}
}

func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) {
testcases := []struct {
flow uint64
digit int
expect uint64
}{
{10, 0, 10},
{13, 1, 10},
{11807, 3, 12000},
{252623, 4, 250000},
{258623, 4, 260000},
{258623, 64, 0},
{252623, math.MaxInt64, 0},
{252623, math.MinInt64, 252623},
}
for _, t := range testcases {
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.flowRoundByDigit = t.digit
r.readBytes = t.flow
r.writtenBytes = t.flow
c.Assert(r.GetRoundBytesRead(), Equals, t.expect)
}
}

var _ = Suite(&testRegionMapSuite{})

type testRegionMapSuite struct{}
Expand Down
5 changes: 4 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
// RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
var (
forwardStream pdpb.PD_RegionHeartbeatClient
cancel context.CancelFunc
Expand Down Expand Up @@ -583,10 +584,12 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
if time.Since(lastBind) > s.cfg.HeartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
s.hbStreams.BindStream(storeID, server)
// refresh FlowRoundByDigit
FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
9 changes: 9 additions & 0 deletions tests/pdctl/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func (s *configTestSuite) TestConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().TraceRegionFlow, Equals, false)

args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "10"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().FlowRoundByDigit, Equals, 10)

args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "-10"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, NotNil)

// config show schedule
args = []string{"-u", pdAddr, "config", "show", "schedule"}
output, err = pdctl.ExecuteCommand(cmd, args...)
Expand Down