diff --git a/errors.toml b/errors.toml index 44a13ef13eb..18e7746c83d 100644 --- a/errors.toml +++ b/errors.toml @@ -551,6 +551,11 @@ error = ''' client url empty ''' +["PD:server:ErrConfiguration"] +error = ''' +cannot set invalid configuration +''' + ["PD:server:ErrLeaderNil"] error = ''' leader is nil diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 5c5ce5cc81f..677bd16ba5c 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -135,6 +135,7 @@ var ( ErrClientURLEmpty = errors.Normalize("client url empty", errors.RFCCodeText("PD:server:ErrClientEmpty")) ErrLeaderNil = errors.Normalize("leader is nil", errors.RFCCodeText("PD:server:ErrLeaderNil")) ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd")) + ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration")) ) // logutil errors diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ffb0f5972c1..9f1b05678ae 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -657,11 +657,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { region.GetApproximateKeys() != origin.GetApproximateKeys() { saveCache = true } - - if c.traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() || - region.GetBytesRead() != origin.GetBytesRead() || - region.GetKeysWritten() != origin.GetKeysWritten() || - region.GetKeysRead() != origin.GetKeysRead()) { + // Once flow has changed, will update the cache. + // Because keys and bytes are strongly related, only bytes are judged. + if c.traceRegionFlow && (region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || + region.GetRoundBytesRead() != origin.GetRoundBytesRead()) { saveCache, needSync = true, true } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 2fe5cf50f49..ed785d08609 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -512,23 +512,11 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { c.Assert(cluster.processRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - // Change keys written. - region = region.Clone(core.SetWrittenKeys(240)) - regions[i] = region - c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) - // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - - // Change keys read. - region = region.Clone(core.SetReadKeys(1080)) - regions[i] = region - c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) } regionCounts := make(map[uint64]int) diff --git a/server/config/config.go b/server/config/config.go index 2ad88b1acf4..d83486f534c 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -227,6 +227,7 @@ const ( defaultUseRegionStorage = true defaultTraceRegionFlow = true + defaultFlowRoundByDigit = 3 defaultMaxResetTSGap = 24 * time.Hour defaultKeyType = "table" @@ -327,6 +328,12 @@ func adjustInt64(v *int64, defValue int64) { } } +func adjustInt(v *int, defValue int) { + if *v == 0 { + *v = defValue + } +} + func adjustFloat64(v *float64, defValue float64) { if *v == 0 { *v = defValue @@ -1070,7 +1077,10 @@ type PDServerConfig struct { // There are some values supported: "auto", "none", or a specific address, default: "auto" DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"` // TraceRegionFlow the option to update flow information of regions + // TODO: deprecate TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"` + // FlowRoundByDigit used to discretization processing flow information. + FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"` } func (c *PDServerConfig) adjust(meta *configMetaData) error { @@ -1090,6 +1100,9 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("trace-region-flow") { c.TraceRegionFlow = defaultTraceRegionFlow } + if !meta.IsDefined("flow-round-by-digit") { + adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit) + } return c.Validate() } @@ -1111,6 +1124,9 @@ func (c *PDServerConfig) Validate() error { return err } } + if c.FlowRoundByDigit < 0 { + return errs.ErrConfigItem.GenWithStack("flow round by digit cannot be negative number") + } return nil } diff --git a/server/core/region.go b/server/core/region.go index fef60c6aead..81ed7a4bb64 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -52,6 +52,7 @@ type RegionInfo struct { approximateKeys int64 interval *pdpb.TimeInterval replicationStatus *replication_modepb.RegionReplicationStatus + flowRoundDivisor uint64 } // NewRegionInfo creates RegionInfo with region's meta and leader peer. @@ -396,11 +397,27 @@ func (r *RegionInfo) GetBytesRead() uint64 { return r.readBytes } +// GetRoundBytesRead returns the read bytes of the region. +func (r *RegionInfo) GetRoundBytesRead() uint64 { + if r.flowRoundDivisor == 0 { + return r.readBytes + } + return ((r.readBytes + r.flowRoundDivisor/2) / r.flowRoundDivisor) * r.flowRoundDivisor +} + // GetBytesWritten returns the written bytes of the region. func (r *RegionInfo) GetBytesWritten() uint64 { return r.writtenBytes } +// GetRoundBytesWritten returns the written bytes of the region. +func (r *RegionInfo) GetRoundBytesWritten() uint64 { + if r.flowRoundDivisor == 0 { + return r.writtenBytes + } + return ((r.writtenBytes + r.flowRoundDivisor/2) / r.flowRoundDivisor) * r.flowRoundDivisor +} + // GetKeysWritten returns the written keys of the region. func (r *RegionInfo) GetKeysWritten() uint64 { return r.writtenKeys diff --git a/server/core/region_option.go b/server/core/region_option.go index e201ff691c4..71fe6acca65 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -14,6 +14,7 @@ package core import ( + "math" "sort" "github.com/pingcap/kvproto/pkg/metapb" @@ -35,6 +36,13 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption { } } +// WithFlowRoundByDigit set the digit, which use to round to the nearest number +func WithFlowRoundByDigit(digit int) RegionCreateOption { + return func(region *RegionInfo) { + region.flowRoundDivisor = uint64(math.Pow10(digit)) + } +} + // WithPendingPeers sets the pending peers for the region. func WithPendingPeers(pendingPeers []*metapb.Peer) RegionCreateOption { return func(region *RegionInfo) { diff --git a/server/core/region_test.go b/server/core/region_test.go index b20ee6b77ce..7219bb36702 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -15,6 +15,7 @@ package core import ( "fmt" + "math" "math/rand" "strconv" "strings" @@ -155,6 +156,29 @@ func (s *testRegionInfoSuite) TestSortedEqual(c *C) { } } +func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) { + testcases := []struct { + flow uint64 + digit int + expect uint64 + }{ + {10, 0, 10}, + {13, 1, 10}, + {11807, 3, 12000}, + {252623, 4, 250000}, + {258623, 4, 260000}, + {258623, 64, 0}, + {252623, math.MaxInt64, 0}, + {252623, math.MinInt64, 252623}, + } + for _, t := range testcases { + r := NewRegionInfo(&metapb.Region{Id: 100}, nil, WithFlowRoundByDigit(t.digit)) + r.readBytes = t.flow + r.writtenBytes = t.flow + c.Assert(r.GetRoundBytesRead(), Equals, t.expect) + } +} + var _ = Suite(&testRegionMapSuite{}) type testRegionMapSuite struct{} diff --git a/server/grpc_service.go b/server/grpc_service.go index fc9c10b1990..f033b35048b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -502,6 +502,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // RegionHeartbeat implements gRPC PDServer. func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { server := &heartbeatServer{stream: stream} + FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit var ( forwardStream pdpb.PD_RegionHeartbeatClient cancel context.CancelFunc @@ -583,10 +584,12 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { if time.Since(lastBind) > s.cfg.HeartbeatStreamBindInterval.Duration { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) + // refresh FlowRoundByDigit + FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit lastBind = time.Now() } - region := core.RegionFromHeartbeat(request) + region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit)) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc() diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 083bd80dd87..97aa40d6ad7 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -110,6 +110,15 @@ func (s *configTestSuite) TestConfig(c *C) { c.Assert(err, IsNil) c.Assert(svr.GetPDServerConfig().TraceRegionFlow, Equals, false) + args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "10"} + _, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(svr.GetPDServerConfig().FlowRoundByDigit, Equals, 10) + + args = []string{"-u", pdAddr, "config", "set", "flow-round-by-digit", "-10"} + _, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, NotNil) + // config show schedule args = []string{"-u", pdAddr, "config", "show", "schedule"} output, err = pdctl.ExecuteCommand(cmd, args...)