Skip to content

Commit

Permalink
Rate limit reading index files
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Jul 20, 2020
1 parent e8f25ea commit 3cb6f0e
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 3 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ require (
go.etcd.io/etcd v3.4.3+incompatible
go.uber.org/atomic v1.5.1
go.uber.org/config v1.4.0
go.uber.org/ratelimit v0.1.0
go.uber.org/zap v1.13.0
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200601175630-2caf76543d99 // indirect
google.golang.org/grpc v1.27.1
gopkg.in/go-ini/ini.v1 v1.57.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0 h1:f3WCSC2KzAcBXGATIxAB1E2XuCpNU255wNKZ505qi3E=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/ratelimit v0.1.0 h1:U2AruXqeTb4Eh9sYQSTrMhH8Cb7M0Ian2ibBOnBcnAw=
go.uber.org/ratelimit v0.1.0/go.mod h1:2X8KaoNd1J0lZV+PxJk/5+DGbO/tpwLR1m++a7FnB/Y=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand Down Expand Up @@ -914,6 +916,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package digest

import (
"hash"
"hash/adler32"

"github.com/m3db/stackadler32"
Expand All @@ -32,6 +33,11 @@ func NewDigest() stackadler32.Digest {
return stackadler32.NewDigest()
}

// NewDigestWriter returns a stateful digest writer.
func NewDigestWriter() hash.Hash32 {
return adler32.New()
}

// Checksum returns the checksum for a buffer.
func Checksum(buf []byte) uint32 {
return adler32.Checksum(buf)
Expand Down
31 changes: 30 additions & 1 deletion src/dbnode/persist/fs/index_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
"github.com/m3db/m3/src/x/mmap"

"go.uber.org/ratelimit"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -257,11 +258,39 @@ func (r *indexReader) ReadSegmentFileSet() (
r.logger.Warn("warning while mmapping files in reader", zap.Error(warning))
}

// Create limiter right before file read in case value just changed.
limits := r.opts.RuntimeOptionsManager().Get().PersistRateLimitOptions()
limiter := ratelimit.NewUnlimited()
if limits.LimitEnabled() {
// We copy 1mb at a time, so set the limit to be how
// many per second we can call.
megaBytesPerSecond := int(limits.LimitMbps() / 8.0)
limiter = ratelimit.New(megaBytesPerSecond)
}

// Use 1mb batch read size to match the rate limit value.
const batchReadSize = 1024 * 1024
hash := digest.NewDigestWriter()
reader := bytes.NewReader(desc.Bytes)
for {
// Wait for availability.
limiter.Take()

// Read batch now rate limiter allowed progression.
_, err := io.CopyN(hash, reader, batchReadSize)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
}

file := newReadableIndexSegmentFileMmap(segFileType, fd, desc)
result.files = append(result.files, file)
digests.files = append(digests.files, indexReaderReadSegmentFileDigest{
segmentFileType: segFileType,
digest: digest.Checksum(desc.Bytes),
digest: hash.Sum32(),
})

// NB(bodu): Free mmaped bytes after we take the checksum so we don't get memory spikes at bootstrap time.
Expand Down
5 changes: 3 additions & 2 deletions src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import (
)

const (
bytesPerMegabit = 1024 * 1024 / 8
// BytesPerMegabit is the number of bytes per megabit.
BytesPerMegabit = 1024 * 1024 / 8
)

type persistManagerStatus int
Expand Down Expand Up @@ -513,7 +514,7 @@ func (pm *persistManager) persist(
if pm.start.IsZero() {
pm.start = start
} else if pm.count >= opts.LimitCheckEvery() {
target := time.Duration(float64(time.Second) * float64(pm.bytesWritten) / (rateLimitMbps * bytesPerMegabit))
target := time.Duration(float64(time.Second) * float64(pm.bytesWritten) / (rateLimitMbps * BytesPerMegabit))
if elapsed := start.Sub(pm.start); elapsed < target {
pm.sleepFn(target - elapsed)
// Recapture start for precise timing, might take some time to "wakeup"
Expand Down

0 comments on commit 3cb6f0e

Please sign in to comment.