Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Jun 2, 2021
1 parent b819c56 commit f4c3d20
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 100 deletions.
68 changes: 0 additions & 68 deletions go.sum

Large diffs are not rendered by default.

12 changes: 0 additions & 12 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,23 +512,11 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys written.
region = region.Clone(core.SetWrittenKeys(240))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys read.
region = region.Clone(core.SetReadKeys(1080))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
}

regionCounts := make(map[uint64]int)
Expand Down
10 changes: 5 additions & 5 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ const (

defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRoundingBits = 9 // 1<<9 = 512
defaultFlowBucketsWidth = 512
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"

Expand Down Expand Up @@ -1073,8 +1073,8 @@ type PDServerConfig struct {
// TraceRegionFlow the option to update flow information of regions
// TODO: deprecate
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"`
// FlowRoundingBits is the bits used to round with shift.
FlowRoundingBits uint64 `toml:"flow-round-bits" json:"flow-round-bits"`
// FlowBucketsWidth is the width used to discretization processing flow information
FlowBucketsWidth uint64 `toml:"flow-rounding-bits" json:"flow-rounding-bits"`
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
Expand All @@ -1094,8 +1094,8 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("trace-region-flow") {
c.TraceRegionFlow = defaultTraceRegionFlow
}
if !meta.IsDefined("flow-round-bits") {
adjustUint64(&c.FlowRoundingBits, defaultFlowRoundingBits)
if !meta.IsDefined("flow-rounding-bits") {
adjustUint64(&c.FlowBucketsWidth, defaultFlowBucketsWidth)
}
return c.Validate()
}
Expand Down
10 changes: 5 additions & 5 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type RegionInfo struct {
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
FlowRoundingBits uint64
flowBucketsWidth uint64
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -399,10 +399,10 @@ func (r *RegionInfo) GetBytesRead() uint64 {

// GetRoundBytesRead returns the read bytes of the region.
func (r *RegionInfo) GetRoundBytesRead() uint64 {
if r.FlowRoundingBits == 0 {
if r.flowBucketsWidth == 0 {
return r.readBytes
}
return (r.readBytes >> r.FlowRoundingBits) << r.FlowRoundingBits
return (r.readBytes / r.flowBucketsWidth) * r.flowBucketsWidth
}

// GetBytesWritten returns the written bytes of the region.
Expand All @@ -412,10 +412,10 @@ func (r *RegionInfo) GetBytesWritten() uint64 {

// GetRoundBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetRoundBytesWritten() uint64 {
if r.FlowRoundingBits == 0 {
if r.flowBucketsWidth == 0 {
return r.writtenBytes
}
return (r.writtenBytes >> r.FlowRoundingBits) << r.FlowRoundingBits
return (r.writtenBytes / r.flowBucketsWidth) * r.flowBucketsWidth
}

// GetKeysWritten returns the written keys of the region.
Expand Down
6 changes: 3 additions & 3 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {
}
}

// WithFlowRoundingBits set the round step, which use to round to the Nearest number
// WithFlowBucketsWidth set the round step, which use to round to the Nearest number
// with the step.
func WithFlowRoundingBits(bits uint64) RegionCreateOption {
func WithFlowBucketsWidth(bits uint64) RegionCreateOption {
return func(region *RegionInfo) {
if bits == 0 {
return
}
region.FlowRoundingBits = bits
region.flowBucketsWidth = bits
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) {
}
for _, t := range testcases {
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.FlowRoundingBits = t.bits
r.flowBucketsWidth = t.bits
r.readBytes = t.flow
r.writtenBytes = t.flow
c.Assert(r.GetRoundBytesRead(), Equals, t.expect)
Expand Down
8 changes: 4 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
// RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
FlowRoundingBits := s.persistOptions.GetPDServerConfig().FlowRoundingBits
FlowBucketsWidth := s.persistOptions.GetPDServerConfig().FlowBucketsWidth
var (
forwardStream pdpb.PD_RegionHeartbeatClient
cancel context.CancelFunc
Expand Down Expand Up @@ -584,12 +584,12 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
if time.Since(lastBind) > s.cfg.HeartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
s.hbStreams.BindStream(storeID, server)
// refresh FlowRoundingBits
FlowRoundingBits = s.persistOptions.GetPDServerConfig().FlowRoundingBits
// refresh FlowBucketsWidth
FlowBucketsWidth = s.persistOptions.GetPDServerConfig().FlowBucketsWidth
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, core.WithFlowRoundingBits(FlowRoundingBits))
region := core.RegionFromHeartbeat(request, core.WithFlowBucketsWidth(FlowBucketsWidth))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
4 changes: 2 additions & 2 deletions tests/pdctl/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ func (s *configTestSuite) TestConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().TraceRegionFlow, Equals, false)

args = []string{"-u", pdAddr, "config", "set", "flow-loss-precision", "502"}
args = []string{"-u", pdAddr, "config", "set", "flow-rounding-bits", "10"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().FlowRoundingBits, Equals, uint64(502))
c.Assert(svr.GetPDServerConfig().FlowBucketsWidth, Equals, uint64(10))

// config show schedule
args = []string{"-u", pdAddr, "config", "show", "schedule"}
Expand Down

0 comments on commit f4c3d20

Please sign in to comment.