Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: make region statistic loss some precision #3728

Merged
merged 14 commits into from
Jun 4, 2021
68 changes: 68 additions & 0 deletions go.sum

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,11 +655,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}

if c.traceRegionFlow && (region.GetBytesWritten() != origin.GetBytesWritten() ||
region.GetBytesRead() != origin.GetBytesRead() ||
region.GetKeysWritten() != origin.GetKeysWritten() ||
region.GetKeysRead() != origin.GetKeysRead()) {
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if c.traceRegionFlow && (region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
region.GetRoundBytesRead() != origin.GetRoundBytesRead()) {
saveCache, needSync = true, true
}

Expand Down
7 changes: 7 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ const (

defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultflowRoundStep = 512 // 512 B
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"

Expand Down Expand Up @@ -1070,7 +1071,10 @@ type PDServerConfig struct {
// There are some values supported: "auto", "none", or a specific address, default: "auto"
DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"`
// TraceRegionFlow the option to update flow information of regions
// TODO: deprecate
rleungx marked this conversation as resolved.
Show resolved Hide resolved
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"`
//FlowRoundStep is the step used to round to nearest.
nolouch marked this conversation as resolved.
Show resolved Hide resolved
FlowRoundStep uint64 `toml:"flow-round-step" json:"flow-round-step"`
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
Expand All @@ -1090,6 +1094,9 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("trace-region-flow") {
c.TraceRegionFlow = defaultTraceRegionFlow
}
if !meta.IsDefined("flow-loss-precision") {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
adjustUint64(&c.FlowRoundStep, defaultflowRoundStep)
}
return c.Validate()
}

Expand Down
17 changes: 17 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RegionInfo struct {
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
flowRoundStep uint64
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -396,11 +397,27 @@ func (r *RegionInfo) GetBytesRead() uint64 {
return r.readBytes
}

// GetRoundBytesRead returns the read bytes of the region.
func (r *RegionInfo) GetRoundBytesRead() uint64 {
if r.flowRoundStep == 0 {
return r.readBytes
}
return (r.readBytes / r.flowRoundStep) * r.flowRoundStep
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}

// GetBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetBytesWritten() uint64 {
return r.writtenBytes
}

// GetRoundBytesWritten returns the written bytes of the region.
func (r *RegionInfo) GetRoundBytesWritten() uint64 {
if r.flowRoundStep == 0 {
return r.writtenBytes
}
return (r.writtenBytes / r.flowRoundStep) * r.flowRoundStep
}

// GetKeysWritten returns the written keys of the region.
func (r *RegionInfo) GetKeysWritten() uint64 {
return r.writtenKeys
Expand Down
11 changes: 11 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {
}
}

// WithFlowRoundStep set the round step, which use to round to the Nearest number
// with the step.
func WithFlowRoundStep(step uint64) RegionCreateOption {
return func(region *RegionInfo) {
if step == 0 {
return
}
region.flowRoundStep = step
}
}

// WithPendingPeers sets the pending peers for the region.
func WithPendingPeers(pendingPeers []*metapb.Peer) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
5 changes: 4 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {
// RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
flowRoundStep := s.persistOptions.GetPDServerConfig().FlowRoundStep
var (
forwardStream pdpb.PD_RegionHeartbeatClient
cancel context.CancelFunc
Expand Down Expand Up @@ -583,10 +584,12 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
if time.Since(lastBind) > s.cfg.HeartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
s.hbStreams.BindStream(storeID, server)
// refresh flowRoundStep
flowRoundStep = s.persistOptions.GetPDServerConfig().FlowRoundStep
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
region := core.RegionFromHeartbeat(request, core.WithFlowRoundStep(flowRoundStep))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
5 changes: 5 additions & 0 deletions tests/pdctl/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func (s *configTestSuite) TestConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().TraceRegionFlow, Equals, false)

args = []string{"-u", pdAddr, "config", "set", "flow-loss-precision", "502"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, IsNil)
c.Assert(svr.GetPDServerConfig().FlowRoundStep, Equals, uint64(502))

// config show schedule
args = []string{"-u", pdAddr, "config", "show", "schedule"}
output, err = pdctl.ExecuteCommand(cmd, args...)
Expand Down