Skip to content

Commit

Permalink
Merge pull request #19660 from influxdata/19335/rate_limit
Browse files Browse the repository at this point in the history
feat(cmd/influx/write): allow to limit write rate
  • Loading branch information
sranka authored Oct 5, 2020
2 parents 47ed97c + f8ffcb8 commit a8c7254
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ need to update any InfluxDB CLI config profiles with the new port number.
1. [19640](https://github.com/influxdata/influxdb/pull/19640): Turn on Community Templates
1. [19663](https://github.com/influxdata/influxdb/pull/19663): Added InfluxDB v2 Listener, NSD, OPC-UA, and Windows Event Log to the sources page
1. [19662](https://github.com/influxdata/influxdb/pull/19662): Add `max-line-length` switch to `influx write` command to address `token too long` errors for large inputs
1. [19660](https://github.com/influxdata/influxdb/pull/19660): Add --rate-limit option to `influx write`.

### Bug Fixes

Expand Down
59 changes: 59 additions & 0 deletions cmd/influx/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"

"github.com/fujiwara/shapeio"
platform "github.com/influxdata/influxdb/v2"
ihttp "github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kit/signals"
Expand Down Expand Up @@ -41,6 +44,7 @@ type writeFlagsType struct {
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
RateLimit string
}

var writeFlags writeFlagsType
Expand Down Expand Up @@ -91,6 +95,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to")
cmd.PersistentFlags().StringVar(&writeFlags.RateLimit, "rate-limit", "", "Throttles write, examples: \"5 MB / 5 min\" , \"17kBs\". \"\" (default) disables throttling.")

cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
Expand Down Expand Up @@ -242,6 +247,20 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
csvReader.RowSkipped = rowSkippedListener
r = csvReader
}
// throttle reader if requested
rateLimit, err := ToBytesPerSecond(writeFlags.RateLimit)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
if rateLimit > 0.0 {
// LineReader ensures that original reader is consumed in the smallest possible
// units (at most one protocol line) to avoid bigger pauses in throttling
r = csv2lp.NewLineReader(r)
throttledReader := shapeio.NewReaderWithContext(r, ctx)
throttledReader.SetRateLimit(rateLimit)
r = throttledReader
}

return r, csv2lp.MultiCloser(closers...), nil
}

Expand Down Expand Up @@ -361,3 +380,43 @@ func isCharacterDevice(reader io.Reader) bool {
}
return (info.Mode() & os.ModeCharDevice) == os.ModeCharDevice
}

var rateLimitRegexp = regexp.MustCompile(`^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|min)$`)
var bytesUnitMultiplier = map[string]float64{"B": 1, "kB": 1024, "MB": 1_048_576}
var timeUnitMultiplier = map[string]float64{"s": 1, "sec": 1, "m": 60, "min": 60}

// ToBytesPerSecond converts rate from string to number. The supplied string
// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional.
// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m.
func ToBytesPerSecond(rateLimit string) (float64, error) {
// ignore all spaces
strVal := strings.ReplaceAll(rateLimit, " ", "")
if len(strVal) == 0 {
return 0, nil
}

matches := rateLimitRegexp.FindStringSubmatch(strVal)
if matches == nil {
return 0, fmt.Errorf("invalid rate limit %q: it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional, rexpexp: %v", strVal, rateLimitRegexp)
}
bytes, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return 0, fmt.Errorf("invalid rate limit %q: '%v' is not count of bytes: %v", strVal, matches[1], err)
}
bytes = bytes * bytesUnitMultiplier[matches[2]]
var time float64
if len(matches[3]) == 0 {
time = 1 // number is not specified, for example 5kbs or 1Mb/s
} else {
int64Val, err := strconv.ParseUint(matches[3], 10, 32)
if err != nil {
return 0, fmt.Errorf("invalid rate limit %q: time is out of range: %v", strVal, err)
}
if int64Val <= 0 {
return 0, fmt.Errorf("invalid rate limit %q: possitive time expected but %v supplied", strVal, matches[3])
}
time = float64(int64Val)
}
time = time * timeUnitMultiplier[matches[4]]
return bytes / time, nil
}
70 changes: 69 additions & 1 deletion cmd/influx/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ func Test_writeFlags_createLineReader(t *testing.T) {
lines: strings.Split(fileContents, "\n"),
lpData: true,
},
{
name: "read data from CSV file + transform to line protocol + throttle read to 1MB/min",
flags: writeFlagsType{
Files: []string{csvFile1},
RateLimit: "1MBs",
},
lines: []string{
"f1 b=f2,c=f3,d=f4",
},
},
}

for _, test := range tests {
Expand All @@ -244,7 +254,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
defer closer.Close()
require.Nil(t, err)
require.NotNil(t, reader)
if !test.lpData {
if !test.lpData && len(test.flags.RateLimit) == 0 {
csvToLineReader, ok := reader.(*csv2lp.CsvToLineReader)
require.True(t, ok)
require.Equal(t, csvToLineReader.LineNumber, test.firstLineCorrection)
Expand Down Expand Up @@ -562,3 +572,61 @@ func Test_writeFlags_errorsFile(t *testing.T) {
require.Nil(t, err)
require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n"))
}

func Test_ToBytesPerSecond(t *testing.T) {
var tests = []struct {
in string
out float64
error string
}{
{
in: "5 MB / 5 min",
out: float64(5*1024*1024) / float64(5*60),
},
{
in: "17kBs",
out: float64(17 * 1024),
},
{
in: "1B/m",
out: float64(1) / float64(60),
},
{
in: "1B/2sec",
out: float64(1) / float64(2),
},
{
in: "",
out: 0,
},
{
in: "1B/munite",
error: `invalid rate limit "1B/munite": it does not match format COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional`,
},
{
in: ".B/s",
error: `invalid rate limit ".B/s": '.' is not count of bytes:`,
},
{
in: "1B0s",
error: `invalid rate limit "1B0s": possitive time expected but 0 supplied`,
},
{
in: "1MB/42949672950s",
error: `invalid rate limit "1MB/42949672950s": time is out of range`,
},
}
for _, test := range tests {
t.Run(test.in, func(t *testing.T) {
bytesPerSec, err := ToBytesPerSecond(test.in)
if len(test.error) == 0 {
require.Equal(t, test.out, bytesPerSec)
require.Nil(t, err)
} else {
require.NotNil(t, err)
// contains is used, since the error messages contains root cause that may evolve with go versions
require.Contains(t, fmt.Sprintf("%s", err), test.error)
}
})
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190819115812-1474bdeaf2a2
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/fatih/color v1.9.0
github.com/fujiwara/shapeio v0.0.0-20170602072123-c073257dd745
github.com/getkin/kin-openapi v0.2.0
github.com/ghodss/yaml v1.0.0
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fujiwara/shapeio v0.0.0-20170602072123-c073257dd745 h1:+tPNWeI7Uk5JKgSj4IYytZc0mdch+e3Yf8g1sGOg4hQ=
github.com/fujiwara/shapeio v0.0.0-20170602072123-c073257dd745/go.mod h1:/WpqsrSkjgwEG2Es2qnZXbXwHDVbawpdlXJIjJMmnZs=
github.com/getkin/kin-openapi v0.2.0 h1:PbHHtYZpjKwZtGlIyELgA2DploRrsaXztoNNx9HjwNY=
github.com/getkin/kin-openapi v0.2.0/go.mod h1:V1z9xl9oF5Wt7v32ne4FmiF1alpS4dM6mNzoywPOXlk=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down

0 comments on commit a8c7254

Please sign in to comment.